diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index f26f5464ff..3e7cf66a03 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -584,6 +584,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 { return float64(m().TimeseriesRepopulated) }) + metrics.NewGauge(`vm_timeseries_precreated_total`, func() float64 { + return float64(m().TimeseriesPreCreated) + }) metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 { return float64(m().NewTimeseriesCreated) }) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e9bfc2b1d8..12bbb83e0c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -27,7 +27,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components * SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html). * SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved). -* FEATURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). See [description for this change](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7094fa38bc207c7bd7330ea8a834310a310ce5e3) for details. +* FEATURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). See [the description for this change](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7094fa38bc207c7bd7330ea8a834310a310ce5e3) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563) for details. +* FEATURE: reduce spikes in CPU and disk IO usage during `indexdb` rotation (aka inverted index), which is performed once per [`-retentionPeriod`](https://docs.victoriametrics.com/#retention). The new algorithm gradually pre-populates newly created `indexdb` during the last hour before the rotation. The number of pre-populated series in the newly created `indexdb` can be [monitored](https://docs.victoriametrics.com/#monitoring) via `vm_timeseries_precreated_total` metric. This should resolve [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): allow selecting time series matching at least one of multiple `or` filters. For example, `{env="prod",job="a" or env="dev",job="b"}` selects series with either `{env="prod",job="a"}` or `{env="dev",job="b"}` labels. This functionality allows passing the selected series to [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) without the need to use [subqueries](https://docs.victoriametrics.com/MetricsQL.html#subqueries). See [these docs](https://docs.victoriametrics.com/keyConcepts.html#filtering-by-multiple-or-filters). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to preserve metric names for binary operation results via `keep_metric_names` modifier. For example, `({__name__=~"foo|bar"} / 10) keep_metric_names` leaves `foo` and `bar` metric names in division results. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#keep_metric_names). This helps to address issues like [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3710). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to copy all the labels from `one` side of [many-to-one operations](https://prometheus.io/docs/prometheus/latest/querying/operators/#many-to-one-and-one-to-many-vector-matches) by specifying `*` inside `group_left()` or `group_right()`. Also allow adding a prefix for copied label names via `group_left(*) prefix "..."` syntax. For example, the following query copies Kubernetes namespace labels to `kube_pod_info` series and adds `ns_` prefix for the copied label names: `kube_pod_info * on(namespace) group_left(*) prefix "ns_" kube_namespace_labels`. The labels from `on()` list aren't prefixed. This feature resolves [this](https://stackoverflow.com/questions/76661818/how-to-add-namespace-labels-to-pod-labels-in-prometheus) and [that](https://stackoverflow.com/questions/76653997/how-can-i-make-a-new-copy-of-kube-namespace-labels-metric-with-a-different-name) questions at StackOverflow. diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 864ab9c1b9..a986f5924a 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -96,9 +96,6 @@ type indexDB struct { // and is used for syncing items from different indexDBs generation uint64 - // The unix timestamp in seconds for the indexDB rotation. - rotationTimestamp uint64 - name string tb *mergeset.Table @@ -136,10 +133,7 @@ func getTagFiltersCacheSize() int { // // The last segment of the path should contain unique hex value which // will be then used as indexDB.generation -// -// The rotationTimestamp must be set to the current unix timestamp when mustOpenIndexDB -// is called when creating new indexdb during indexdb rotation. -func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) *indexDB { +func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB { if s == nil { logger.Panicf("BUG: Storage must be nin-nil") } @@ -157,11 +151,10 @@ func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOn tagFiltersCacheSize := getTagFiltersCacheSize() db := &indexDB{ - refCount: 1, - generation: gen, - rotationTimestamp: rotationTimestamp, - tb: tb, - name: name, + refCount: 1, + generation: gen, + tb: tb, + name: name, tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize), s: s, diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 1f0d554639..53c38c88e3 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -511,7 +511,7 @@ func TestIndexDBOpenClose(t *testing.T) { tableName := nextIndexDBTableName() for i := 0; i < 5; i++ { var isReadOnly uint32 - db := mustOpenIndexDB(tableName, &s, 0, &isReadOnly) + db := mustOpenIndexDB(tableName, &s, &isReadOnly) db.MustClose() } if err := os.RemoveAll(tableName); err != nil { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 77a0e967aa..50c85a94ef 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -45,6 +45,7 @@ type Storage struct { tooBigTimestampRows uint64 timeseriesRepopulated uint64 + timeseriesPreCreated uint64 newTimeseriesCreated uint64 slowRowInserts uint64 slowPerDayIndexInserts uint64 @@ -53,6 +54,13 @@ type Storage struct { hourlySeriesLimitRowsDropped uint64 dailySeriesLimitRowsDropped uint64 + // nextRotationTimestamp is a timestamp in seconds of the next indexdb rotation. + // + // It is used for gradual pre-population of the idbNext during the last hour before the indexdb rotation. + // in order to reduce spikes in CPU and disk IO usage just after the rotiation. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 + nextRotationTimestamp int64 + path string cachePath string retentionMsecs int64 @@ -60,8 +68,17 @@ type Storage struct { // lock file for exclusive access to the storage on the given path. flockF *os.File + // idbCurr contains the currently used indexdb. idbCurr atomic.Pointer[indexDB] + // idbNext is the next indexdb, which will become idbCurr at the next rotation. + // + // It is started to be gradually pre-populated with the data for active time series during the last hour + // before nextRotationTimestamp. + // This reduces spikes in CPU and disk IO usage just after the rotiation. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 + idbNext atomic.Pointer[indexDB] + tb *table // Series cardinality limiters. @@ -77,7 +94,8 @@ type Storage struct { // metricNameCache is MetricID -> MetricName cache. metricNameCache *workingsetcache.Cache - // dateMetricIDCache is (Date, MetricID) cache. + // dateMetricIDCache is (generation, Date, MetricID) cache, where generation is the indexdb generation. + // See generationTSID for details. dateMetricIDCache *dateMetricIDCache // Fast cache for MetricID values occurred during the current hour. @@ -213,9 +231,6 @@ func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDail s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) - date := fasttime.UnixDate() - nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date) - s.nextDayMetricIDs.Store(nextDayMetricIDs) s.pendingNextDayMetricIDs = &uint64set.Set{} s.prefetchedMetricIDs.Store(&uint64set.Set{}) @@ -231,9 +246,23 @@ func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDail idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname) fs.MustMkdirIfNotExist(idbSnapshotsPath) fs.MustRemoveTemporaryDirs(idbSnapshotsPath) - idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath) + idbNext, idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath) + idbCurr.SetExtDB(idbPrev) + idbNext.SetExtDB(idbCurr) + s.idbCurr.Store(idbCurr) + s.idbNext.Store(idbNext) + + // Initialize nextRotationTimestamp + nowSecs := time.Now().UnixNano() / 1e9 + nextRotationTimestamp := nextRetentionDeadlineSeconds(nowSecs, retentionMsecs/1000, retentionTimezoneOffsetSecs) + atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp) + + // Load nextDayMetricIDs cache + date := fasttime.UnixDate() + nextDayMetricIDs := s.mustLoadNextDayMetricIDs(idbCurr.generation, date) + s.nextDayMetricIDs.Store(nextDayMetricIDs) // Load deleted metricIDs from idbCurr and idbPrev dmisCurr, err := idbCurr.loadDeletedMetricIDs() @@ -461,6 +490,7 @@ type Metrics struct { TooBigTimestampRows uint64 TimeseriesRepopulated uint64 + TimeseriesPreCreated uint64 NewTimeseriesCreated uint64 SlowRowInserts uint64 SlowPerDayIndexInserts uint64 @@ -532,6 +562,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows) m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated) + m.TimeseriesPreCreated += atomic.LoadUint64(&s.timeseriesPreCreated) m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated) m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) @@ -602,12 +633,20 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len()) m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes()) - m.NextRetentionSeconds = uint64(nextRetentionDuration(s.retentionMsecs).Seconds()) + d := s.nextRetentionSeconds() + if d < 0 { + d = 0 + } + m.NextRetentionSeconds = uint64(d) s.idb().UpdateMetrics(&m.IndexDBMetrics) s.tb.UpdateMetrics(&m.TableMetrics) } +func (s *Storage) nextRetentionSeconds() int64 { + return atomic.LoadInt64(&s.nextRotationTimestamp) - int64(fasttime.UnixTimestamp()) +} + // SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path // // The function must be called before opening or creating any storage. @@ -664,11 +703,11 @@ func (s *Storage) startRetentionWatcher() { func (s *Storage) retentionWatcher() { for { - d := nextRetentionDuration(s.retentionMsecs) + d := s.nextRetentionSeconds() select { case <-s.stop: return - case <-time.After(d): + case <-time.After(time.Second * time.Duration(d)): s.mustRotateIndexDB() } } @@ -727,23 +766,29 @@ func (s *Storage) nextDayMetricIDsUpdater() { } func (s *Storage) mustRotateIndexDB() { - // Create new indexdb table. + // Create new indexdb table, which will be used as idbNext newTableName := nextIndexDBTableName() idbNewPath := filepath.Join(s.path, indexdbDirname, newTableName) - rotationTimestamp := fasttime.UnixTimestamp() - idbNew := mustOpenIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly) + idbNew := mustOpenIndexDB(idbNewPath, s, &s.isReadOnly) - // Drop extDB + // Update nextRotationTimestamp + atomic.AddInt64(&s.nextRotationTimestamp, s.retentionMsecs/1000) + + // Set idbNext to idbNew + idbNext := s.idbNext.Load() + idbNew.SetExtDB(idbNext) + s.idbNext.Store(idbNew) + + // Set idbCurr to idbNext idbCurr := s.idb() + s.idbCurr.Store(idbNext) + + // Schedule data removal for idbPrev idbCurr.doExtDB(func(extDB *indexDB) { extDB.scheduleToDrop() }) idbCurr.SetExtDB(nil) - // Start using idbNew - idbNew.SetExtDB(idbCurr) - s.idbCurr.Store(idbNew) - // Persist changes on the file system. fs.MustSyncPath(s.path) @@ -753,7 +798,7 @@ func (s *Storage) mustRotateIndexDB() { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 // Flush metric id caches for the current and the previous hour, - // since they may contain entries missing in idbNew. + // since they may contain entries missing in idbCurr after the rotation. // This should prevent from missing data in queries when // the following steps are performed for short -retentionPeriod (e.g. 1 day): // @@ -763,7 +808,7 @@ func (s *Storage) mustRotateIndexDB() { // These series are already registered in prevHourMetricIDs, so VM doesn't add per-day entries to the current indexdb. // 4. Stop adding new samples for these series just before 5 UTC. // 5. The next indexdb rotation is performed at 4 UTC next day. - // The information about the series from step 3 disappears from indexdb, since the old indexdb from step 1 is deleted, + // The information about the series added at step 3 disappears from indexdb, since the old indexdb from step 1 is deleted, // while the current indexdb doesn't contain information about the series. // So queries for the last 24 hours stop returning samples added at step 3. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698 @@ -773,13 +818,12 @@ func (s *Storage) mustRotateIndexDB() { s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) - // Flush dateMetricIDCache, so idbNew can be populated with fresh data. - s.dateMetricIDCache.Reset() + // Do not flush dateMetricIDCache, since it contains entries prefixed with idb generation. + + // There is no need in resetting nextDayMetricIDs, since it contains entries prefixed with idb generation. // Do not flush metricIDCache and metricNameCache, since all the metricIDs // from prev idb remain valid after the rotation. - - // There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day. } func (s *Storage) resetAndSaveTSIDCache() { @@ -833,11 +877,14 @@ func (s *Storage) MustClose() { } } -func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry { +func (s *Storage) mustLoadNextDayMetricIDs(generation, date uint64) *byDateMetricIDEntry { e := &byDateMetricIDEntry{ - date: date, + k: generationDateKey{ + generation: generation, + date: date, + }, } - name := "next_day_metric_ids" + name := "next_day_metric_ids_v2" path := filepath.Join(s.cachePath, name) if !fs.IsPathExist(path) { return e @@ -846,12 +893,17 @@ func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry { if err != nil { logger.Panicf("FATAL: cannot read %s: %s", path, err) } - if len(src) < 16 { - logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 16) + if len(src) < 24 { + logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) return e } // Unmarshal header + generationLoaded := encoding.UnmarshalUint64(src) + src = src[8:] + if generationLoaded != generation { + logger.Infof("discarding %s, since it contains data for stale generation; got %d; want %d", path, generationLoaded, generation) + } dateLoaded := encoding.UnmarshalUint64(src) src = src[8:] if dateLoaded != date { @@ -948,12 +1000,13 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs } func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) { - name := "next_day_metric_ids" + name := "next_day_metric_ids_v2" path := filepath.Join(s.cachePath, name) dst := make([]byte, 0, e.v.Len()*8+16) // Marshal header - dst = encoding.MarshalUint64(dst, e.date) + dst = encoding.MarshalUint64(dst, e.k.generation) + dst = encoding.MarshalUint64(dst, e.k.date) // Marshal e.v dst = marshalUint64Set(dst, &e.v) @@ -1072,30 +1125,31 @@ var saveCacheLock sync.Mutex // SetRetentionTimezoneOffset sets the offset, which is used for calculating the time for indexdb rotation. // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574 func SetRetentionTimezoneOffset(offset time.Duration) { - retentionTimezoneOffsetMsecs = offset.Milliseconds() + retentionTimezoneOffsetSecs = int64(offset.Seconds()) } -var retentionTimezoneOffsetMsecs int64 +var retentionTimezoneOffsetSecs int64 -func nextRetentionDuration(retentionMsecs int64) time.Duration { - nowMsecs := time.Now().UnixNano() / 1e6 - return nextRetentionDurationAt(nowMsecs, retentionMsecs) -} +func nextRetentionDeadlineSeconds(atSecs, retentionSecs, offsetSecs int64) int64 { + // Round retentionSecs to days. This guarantees that per-day inverted index works as expected + const secsPerDay = 24 * 3600 + retentionSecs = ((retentionSecs + secsPerDay - 1) / secsPerDay) * secsPerDay -func nextRetentionDurationAt(atMsecs int64, retentionMsecs int64) time.Duration { - // Round retentionMsecs to days. This guarantees that per-day inverted index works as expected - retentionMsecs = ((retentionMsecs + msecPerDay - 1) / msecPerDay) * msecPerDay + // Schedule the deadline to +4 hours from the next retention period start + // because of historical reasons - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248 + offsetSecs -= 4 * 3600 - // The effect of time zone on retention period is moved out. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574 - deadline := ((atMsecs + retentionMsecs + retentionTimezoneOffsetMsecs - 1) / retentionMsecs) * retentionMsecs + // Make sure that offsetSecs doesn't exceed retentionSecs + offsetSecs %= retentionSecs - // Schedule the deadline to +4 hours from the next retention period start. - // This should prevent from possible double deletion of indexdb - // due to time drift - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248 . - deadline += int64(4 * 3600 * 1000) - deadline -= retentionTimezoneOffsetMsecs - return time.Duration(deadline-atMsecs) * time.Millisecond + // align the retention deadline to multiples of retentionSecs + // This makes the deadline independent of atSecs. + deadline := ((atSecs + offsetSecs + retentionSecs - 1) / retentionSecs) * retentionSecs + + // Apply the provided offsetSecs + deadline -= offsetSecs + + return deadline } // SearchMetricNames returns marshaled metric names matching the given tfss on the given tr. @@ -1661,6 +1715,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { var seriesRepopulated uint64 idb := s.idb() + generation := idb.generation is := idb.getIndexSearch(0, 0, noDeadline) defer idb.putIndexSearch(is) var firstWarn error @@ -1673,7 +1728,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { // Skip row, since it exceeds cardinality limit continue } - if genTSID.generation != idb.generation { + if genTSID.generation < generation { // The found TSID is from the previous indexdb. Create it in the current indexdb. if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { @@ -1688,7 +1743,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { mn.sortTags() createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) - genTSID.generation = idb.generation + genTSID.generation = generation s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) seriesRepopulated++ } @@ -1718,10 +1773,10 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { continue } - if genTSID.generation != idb.generation { + if genTSID.generation < generation { // The found TSID is from the previous indexdb. Create it in the current indexdb. createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) - genTSID.generation = idb.generation + genTSID.generation = generation seriesRepopulated++ } s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) @@ -1739,12 +1794,15 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { // Schedule creating TSID indexes instead of creating them synchronously. // This should keep stable the ingestion rate when new time series are ingested. createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) - genTSID.generation = idb.generation + genTSID.generation = generation s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) } atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) + // There is no need in pre-filling idbNext here, since RegisterMetricNames() is rarely called. + // So it is OK to register metric names in blocking manner after indexdb rotation. + if firstWarn != nil { logger.Warnf("cannot create some metrics: %s", firstWarn) } @@ -1752,6 +1810,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error { idb := s.idb() + generation := idb.generation is := idb.getIndexSearch(0, 0, noDeadline) defer idb.putIndexSearch(is) @@ -1835,7 +1894,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci prevTSID = r.TSID prevMetricNameRaw = mr.MetricNameRaw - if genTSID.generation != idb.generation { + if genTSID.generation < generation { // The found TSID is from the previous indexdb. Create it in the current indexdb. date := uint64(r.Timestamp) / msecPerDay @@ -1849,7 +1908,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci mn.sortTags() createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) - genTSID.generation = idb.generation + genTSID.generation = generation s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) seriesRepopulated++ slowInsertsCount++ @@ -1883,10 +1942,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci continue } - if genTSID.generation != idb.generation { + if genTSID.generation < generation { // The found TSID is from the previous indexdb. Create it in the current indexdb. createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) - genTSID.generation = idb.generation + genTSID.generation = generation seriesRepopulated++ } s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) @@ -1907,7 +1966,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci } createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) - genTSID.generation = idb.generation + genTSID.generation = generation s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) newSeriesCount++ @@ -1924,11 +1983,17 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount) atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) + dstMrs = dstMrs[:j] + rows = rows[:j] + + if err := s.prefillNextIndexDB(rows, dstMrs); err != nil { + if firstWarn == nil { + firstWarn = err + } + } if firstWarn != nil { storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn) } - dstMrs = dstMrs[:j] - rows = rows[:j] err := s.updatePerDateData(rows, dstMrs) if err != nil { @@ -1963,9 +2028,9 @@ func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID // so future rows for that TSID are ingested via fast path. s.putTSIDToCache(genTSID, metricNameRaw) - // Register the (date, metricID) entry in the cache, + // Register the (generation, date, metricID) entry in the cache, // so next time the entry is found there instead of searching for it in the indexdb. - s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID) + s.dateMetricIDCache.Set(genTSID.generation, date, genTSID.TSID.MetricID) } func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool { @@ -2004,6 +2069,77 @@ func getUserReadableMetricName(metricNameRaw []byte) string { return mn.String() } +func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error { + d := s.nextRetentionSeconds() + if d >= 3600 { + // Fast path: nothing to pre-fill because it is too early. + // The pre-fill is started during the last hour before the indexdb rotation. + return nil + } + + // Slower path: less than hour left for the next indexdb rotation. + // Pre-populate idbNext with the increasing probability until the rotation. + // The probability increases from 0% to 100% proportioinally to d=[3600 .. 0]. + pMin := float64(d) / 3600 + + idbNext := s.idbNext.Load() + generation := idbNext.generation + isNext := idbNext.getIndexSearch(0, 0, noDeadline) + defer idbNext.putIndexSearch(isNext) + + var firstError error + var genTSID generationTSID + mn := GetMetricName() + defer PutMetricName(mn) + + timeseriesPreCreated := uint64(0) + for i := range rows { + r := &rows[i] + p := float64(uint32(fastHashUint64(r.TSID.MetricID))) / (1 << 32) + if p < pMin { + // Fast path: it is too early to pre-fill indexes for the given MetricID. + continue + } + + // Check whether the given MetricID is already present in dateMetricIDCache. + date := uint64(r.Timestamp) / msecPerDay + metricID := r.TSID.MetricID + if s.dateMetricIDCache.Has(generation, date, metricID) { + // Indexes are already pre-filled. + continue + } + + // Check whether the given (date, metricID) is already present in idbNext. + if isNext.hasDateMetricIDNoExtDB(date, metricID, r.TSID.AccountID, r.TSID.ProjectID) { + // Indexes are already pre-filled at idbNext. + // + // Register the (generation, date, metricID) entry in the cache, + // so next time the entry is found there instead of searching for it in the indexdb. + s.dateMetricIDCache.Set(generation, date, metricID) + continue + } + + // Slow path: pre-fill indexes in idbNext. + metricNameRaw := mrs[i].MetricNameRaw + if err := mn.UnmarshalRaw(metricNameRaw); err != nil { + if firstError == nil { + firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", metricNameRaw, err) + } + continue + } + mn.sortTags() + + createAllIndexesForMetricName(isNext, mn, &r.TSID, date) + genTSID.TSID = r.TSID + genTSID.generation = generation + s.putSeriesToCache(metricNameRaw, &genTSID, date) + timeseriesPreCreated++ + } + atomic.AddUint64(&s.timeseriesPreCreated, timeseriesPreCreated) + + return firstError +} + func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { var date uint64 var hour uint64 @@ -2014,6 +2150,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { prevDate uint64 prevMetricID uint64 ) + + idb := s.idb() + generation := idb.generation + hm := s.currHourMetricIDs.Load() hmPrev := s.prevHourMetricIDs.Load() hmPrevDate := hmPrev.hour / 24 @@ -2079,8 +2219,8 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { } } - // Slower path: check global cache for (date, metricID) entry. - if s.dateMetricIDCache.Has(date, metricID) { + // Slower path: check global cache for (generation, date, metricID) entry. + if s.dateMetricIDCache.Has(generation, date, metricID) { continue } // Slow path: store the (date, metricID) entry in the indexDB. @@ -2124,7 +2264,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { return a.tsid.MetricID < b.tsid.MetricID }) - idb := s.idb() is := idb.getIndexSearch(0, 0, noDeadline) defer idb.putIndexSearch(is) @@ -2154,7 +2293,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { } PutMetricName(mn) // The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB. - s.dateMetricIDCache.Store(dateMetricIDsForCache) + s.dateMetricIDCache.Store(generation, dateMetricIDsForCache) return firstError } @@ -2188,12 +2327,6 @@ func newDateMetricIDCache() *dateMetricIDCache { return &dmc } -func (dmc *dateMetricIDCache) Reset() { - dmc.mu.Lock() - dmc.resetLocked() - dmc.mu.Unlock() -} - func (dmc *dateMetricIDCache) resetLocked() { // Do not reset syncsCount and resetsCount dmc.byDate.Store(newByDateMetricIDMap()) @@ -2221,9 +2354,9 @@ func (dmc *dateMetricIDCache) SizeBytes() uint64 { return n } -func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { +func (dmc *dateMetricIDCache) Has(generation, date, metricID uint64) bool { byDate := dmc.byDate.Load() - v := byDate.get(date) + v := byDate.get(generation, date) if v.Has(metricID) { // Fast path. // The majority of calls must go here. @@ -2232,7 +2365,7 @@ func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { // Slow path. Check mutable map. dmc.mu.Lock() - v = dmc.byDateMutable.get(date) + v = dmc.byDateMutable.get(generation, date) ok := v.Has(metricID) dmc.syncLockedIfNeeded() dmc.mu.Unlock() @@ -2245,7 +2378,7 @@ type dateMetricID struct { metricID uint64 } -func (dmc *dateMetricIDCache) Store(dmids []dateMetricID) { +func (dmc *dateMetricIDCache) Store(generation uint64, dmids []dateMetricID) { var prevDate uint64 metricIDs := make([]uint64, 0, len(dmids)) dmc.mu.Lock() @@ -2255,22 +2388,22 @@ func (dmc *dateMetricIDCache) Store(dmids []dateMetricID) { continue } if len(metricIDs) > 0 { - v := dmc.byDateMutable.getOrCreate(prevDate) + v := dmc.byDateMutable.getOrCreate(generation, prevDate) v.AddMulti(metricIDs) } metricIDs = append(metricIDs[:0], dmid.metricID) prevDate = dmid.date } if len(metricIDs) > 0 { - v := dmc.byDateMutable.getOrCreate(prevDate) + v := dmc.byDateMutable.getOrCreate(generation, prevDate) v.AddMulti(metricIDs) } dmc.mu.Unlock() } -func (dmc *dateMetricIDCache) Set(date, metricID uint64) { +func (dmc *dateMetricIDCache) Set(generation, date, metricID uint64) { dmc.mu.Lock() - v := dmc.byDateMutable.getOrCreate(date) + v := dmc.byDateMutable.getOrCreate(generation, date) v.Add(metricID) dmc.mu.Unlock() } @@ -2288,31 +2421,38 @@ func (dmc *dateMetricIDCache) syncLocked() { // Nothing to sync. return } + + // Merge data from byDate into byDateMutable and then atomically replace byDate with the merged data. byDate := dmc.byDate.Load() byDateMutable := dmc.byDateMutable - for date, e := range byDateMutable.m { - v := byDate.get(date) + for k, e := range byDateMutable.m { + v := byDate.get(k.generation, k.date) if v == nil { + // Nothing to merge continue } v = v.Clone() v.Union(&e.v) dme := &byDateMetricIDEntry{ - date: date, - v: *v, + k: k, + v: *v, } - if date == byDateMutable.hotEntry.Load().date { + byDateMutable.m[k] = dme + he := byDateMutable.hotEntry.Load() + if he.k == k { byDateMutable.hotEntry.Store(dme) } - byDateMutable.m[date] = dme } - for date, e := range byDate.m { - v := byDateMutable.get(date) + // Copy entries from byDate, which are missing in byDateMutable + for k, e := range byDate.m { + v := byDateMutable.get(k.generation, k.date) if v != nil { continue } - byDateMutable.m[date] = e + byDateMutable.m[k] = e } + + // Atomically replace byDate with byDateMutable dmc.byDate.Store(dmc.byDateMutable) dmc.byDateMutable = newByDateMetricIDMap() @@ -2325,25 +2465,34 @@ func (dmc *dateMetricIDCache) syncLocked() { type byDateMetricIDMap struct { hotEntry atomic.Pointer[byDateMetricIDEntry] - m map[uint64]*byDateMetricIDEntry + m map[generationDateKey]*byDateMetricIDEntry +} + +type generationDateKey struct { + generation uint64 + date uint64 } func newByDateMetricIDMap() *byDateMetricIDMap { dmm := &byDateMetricIDMap{ - m: make(map[uint64]*byDateMetricIDEntry), + m: make(map[generationDateKey]*byDateMetricIDEntry), } dmm.hotEntry.Store(&byDateMetricIDEntry{}) return dmm } -func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set { +func (dmm *byDateMetricIDMap) get(generation, date uint64) *uint64set.Set { hotEntry := dmm.hotEntry.Load() - if hotEntry.date == date { + if hotEntry.k.generation == generation && hotEntry.k.date == date { // Fast path return &hotEntry.v } // Slow path - e := dmm.m[date] + k := generationDateKey{ + generation: generation, + date: date, + } + e := dmm.m[k] if e == nil { return nil } @@ -2351,36 +2500,41 @@ func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set { return &e.v } -func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set { - v := dmm.get(date) +func (dmm *byDateMetricIDMap) getOrCreate(generation, date uint64) *uint64set.Set { + v := dmm.get(generation, date) if v != nil { return v } - e := &byDateMetricIDEntry{ - date: date, + k := generationDateKey{ + generation: generation, + date: date, } - dmm.m[date] = e + e := &byDateMetricIDEntry{ + k: k, + } + dmm.m[k] = e return &e.v } type byDateMetricIDEntry struct { - date uint64 - v uint64set.Set + k generationDateKey + v uint64set.Set } func (s *Storage) updateNextDayMetricIDs(date uint64) { + generation := s.idb().generation e := s.nextDayMetricIDs.Load() s.pendingNextDayMetricIDsLock.Lock() pendingMetricIDs := s.pendingNextDayMetricIDs s.pendingNextDayMetricIDs = &uint64set.Set{} s.pendingNextDayMetricIDsLock.Unlock() - if pendingMetricIDs.Len() == 0 && e.date == date { + if pendingMetricIDs.Len() == 0 && e.k.generation == generation && e.k.date == date { // Fast path: nothing to update. return } // Slow path: union pendingMetricIDs with e.v - if e.date == date { + if e.k.generation == generation && e.k.date == date { pendingMetricIDs.Union(&e.v) } else { // Do not add pendingMetricIDs from the previous day to the current day, @@ -2388,9 +2542,13 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) { // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 pendingMetricIDs = &uint64set.Set{} } + k := generationDateKey{ + generation: generation, + date: date, + } eNew := &byDateMetricIDEntry{ - date: date, - v: *pendingMetricIDs, + k: k, + v: *pendingMetricIDs, } s.nextDayMetricIDs.Store(eNew) } @@ -2486,12 +2644,11 @@ func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) { s.tsidCache.Set(metricName, buf) } -func (s *Storage) mustOpenIndexDBTables(path string) (curr, prev *indexDB) { +func (s *Storage) mustOpenIndexDBTables(path string) (next, curr, prev *indexDB) { fs.MustMkdirIfNotExist(path) fs.MustRemoveTemporaryDirs(path) - // Search for the two most recent tables - the last one is active, - // the previous one contains backup data. + // Search for the three most recent tables - the prev, curr and next. des := fs.MustReadDir(path) var tableNames []string for _, de := range des { @@ -2509,37 +2666,42 @@ func (s *Storage) mustOpenIndexDBTables(path string) (curr, prev *indexDB) { sort.Slice(tableNames, func(i, j int) bool { return tableNames[i] < tableNames[j] }) - if len(tableNames) < 2 { - // Create missing tables - if len(tableNames) == 0 { - prevName := nextIndexDBTableName() - tableNames = append(tableNames, prevName) - } + switch len(tableNames) { + case 0: + prevName := nextIndexDBTableName() currName := nextIndexDBTableName() - tableNames = append(tableNames, currName) + nextName := nextIndexDBTableName() + tableNames = append(tableNames, prevName, currName, nextName) + case 1: + currName := nextIndexDBTableName() + nextName := nextIndexDBTableName() + tableNames = append(tableNames, currName, nextName) + case 2: + nextName := nextIndexDBTableName() + tableNames = append(tableNames, nextName) + default: + // Remove all the tables except the last three tables. + for _, tn := range tableNames[:len(tableNames)-3] { + pathToRemove := filepath.Join(path, tn) + logger.Infof("removing obsolete indexdb dir %q...", pathToRemove) + fs.MustRemoveAll(pathToRemove) + logger.Infof("removed obsolete indexdb dir %q", pathToRemove) + } + fs.MustSyncPath(path) + + tableNames = tableNames[len(tableNames)-3:] } - // Invariant: len(tableNames) >= 2 + // Open tables + nextPath := filepath.Join(path, tableNames[2]) + currPath := filepath.Join(path, tableNames[1]) + prevPath := filepath.Join(path, tableNames[0]) - // Remove all the tables except two last tables. - for _, tn := range tableNames[:len(tableNames)-2] { - pathToRemove := filepath.Join(path, tn) - logger.Infof("removing obsolete indexdb dir %q...", pathToRemove) - fs.MustRemoveAll(pathToRemove) - logger.Infof("removed obsolete indexdb dir %q", pathToRemove) - } + next = mustOpenIndexDB(nextPath, s, &s.isReadOnly) + curr = mustOpenIndexDB(currPath, s, &s.isReadOnly) + prev = mustOpenIndexDB(prevPath, s, &s.isReadOnly) - // Persist changes on the file system. - fs.MustSyncPath(path) - - // Open the last two tables. - currPath := filepath.Join(path, tableNames[len(tableNames)-1]) - - curr = mustOpenIndexDB(currPath, s, 0, &s.isReadOnly) - prevPath := filepath.Join(path, tableNames[len(tableNames)-2]) - prev = mustOpenIndexDB(prevPath, s, 0, &s.isReadOnly) - - return curr, prev + return next, curr, prev } var indexDBTableNameRegexp = regexp.MustCompile("^[0-9A-F]{16}$") diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 01a5c00bdc..7b8afe21ea 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -90,23 +90,25 @@ func TestDateMetricIDCacheConcurrent(t *testing.T) { func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { type dmk struct { - date uint64 - metricID uint64 + generation uint64 + date uint64 + metricID uint64 } m := make(map[dmk]bool) for i := 0; i < 1e5; i++ { + generation := uint64(i) % 2 date := uint64(i) % 3 metricID := uint64(i) % 1237 - if !concurrent && c.Has(date, metricID) { - if !m[dmk{date, metricID}] { - return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID) + if !concurrent && c.Has(generation, date, metricID) { + if !m[dmk{generation, date, metricID}] { + return fmt.Errorf("c.Has(%d, %d, %d) must return false, but returned true", generation, date, metricID) } continue } - c.Set(date, metricID) - m[dmk{date, metricID}] = true - if !concurrent && !c.Has(date, metricID) { - return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID) + c.Set(generation, date, metricID) + m[dmk{generation, date, metricID}] = true + if !concurrent && !c.Has(generation, date, metricID) { + return fmt.Errorf("c.Has(%d, %d, %d) must return true, but returned false", generation, date, metricID) } if i%11234 == 0 { c.mu.Lock() @@ -114,25 +116,29 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { c.mu.Unlock() } if i%34323 == 0 { - c.Reset() + c.mu.Lock() + c.resetLocked() + c.mu.Unlock() m = make(map[dmk]bool) } } // Verify fast path after sync. for i := 0; i < 1e5; i++ { + generation := uint64(i) % 2 date := uint64(i) % 3 metricID := uint64(i) % 123 - c.Set(date, metricID) + c.Set(generation, date, metricID) } c.mu.Lock() c.syncLocked() c.mu.Unlock() for i := 0; i < 1e5; i++ { + generation := uint64(i) % 2 date := uint64(i) % 3 metricID := uint64(i) % 123 - if !concurrent && !c.Has(date, metricID) { - return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID) + if !concurrent && !c.Has(generation, date, metricID) { + return fmt.Errorf("c.Has(%d, %d, %d) must return true after sync", generation, date, metricID) } } @@ -140,7 +146,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { if n := c.EntriesCount(); !concurrent && n < 123 { return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n) } - c.Reset() + c.mu.Lock() + c.resetLocked() + c.mu.Unlock() if n := c.EntriesCount(); !concurrent && n > 0 { return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n) } @@ -493,45 +501,53 @@ func TestMetricRowMarshalUnmarshal(t *testing.T) { } } -func TestNextRetentionDuration(t *testing.T) { - validateRetention := func(retention int64) { +func TestNextRetentionDeadlineSeconds(t *testing.T) { + f := func(currentTime string, retention, offset time.Duration, deadlineExpected string) { t.Helper() - validateRetentionAt := func(now time.Time, retention int64) { - nowMsecs := now.UnixMilli() - d := nextRetentionDurationAt(nowMsecs, retention) - if d <= 0 { - nextTime := now.Add(d) - retentionHuman := time.Duration(retention) * time.Millisecond - t.Errorf("unexpected retention duration for retention=%s; got %s(%s); must be %s + %s; offset: %s", retentionHuman, nextTime, d, now, retentionHuman, time.Duration(retentionTimezoneOffsetMsecs)*time.Millisecond) - } + + now, err := time.Parse(time.RFC3339, currentTime) + if err != nil { + t.Fatalf("cannot parse currentTime=%q: %s", currentTime, err) } - // UTC offsets are in range [-12 hours, +14 hours]. - // Verify that any legit combination of retention timezone and local time - // will return valid retention duration. - // See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4207 - for retentionOffset := -12; retentionOffset <= 14; retentionOffset++ { - SetRetentionTimezoneOffset(time.Duration(retentionOffset) * time.Hour) - validateRetentionAt(time.Now().UTC(), retention) - - now := time.Date(2023, 4, 27, 23, 58, 0, 0, time.UTC) - validateRetentionAt(now, retention) - - now = time.Date(2023, 4, 27, 0, 1, 0, 0, time.UTC) - validateRetentionAt(now, retention) - - now = time.Date(2023, 4, 27, 0, 0, 0, 0, time.UTC) - validateRetentionAt(now, retention) + d := nextRetentionDeadlineSeconds(now.Unix(), int64(retention.Seconds()), int64(offset.Seconds())) + deadline := time.Unix(d, 0).UTC().Format(time.RFC3339) + if deadline != deadlineExpected { + t.Fatalf("unexpected deadline; got %s; want %s", deadline, deadlineExpected) } } - for retentionDays := 0.3; retentionDays < 3; retentionDays += 0.3 { - validateRetention(int64(retentionDays * msecPerDay)) - } + f("2023-07-22T12:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z") + f("2023-07-22T03:44:35Z", 24*time.Hour, 0, "2023-07-22T04:00:00Z") + f("2023-07-22T04:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z") + f("2023-07-22T23:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z") + f("2023-07-23T03:59:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z") - for retentionMonths := float64(0.1); retentionMonths < 120; retentionMonths += 0.3 { - validateRetention(int64(retentionMonths * msecsPerMonth)) - } + f("2023-07-22T12:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z") + f("2023-07-22T01:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-22T02:00:00Z") + f("2023-07-22T02:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z") + f("2023-07-22T23:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z") + f("2023-07-23T01:59:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z") + + f("2023-07-22T12:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z") + f("2023-07-22T08:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-22T09:00:00Z") + f("2023-07-22T09:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z") + + f("2023-07-22T12:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z") + f("2023-07-22T15:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z") + f("2023-07-22T16:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-23T16:00:00Z") + + f("2023-07-22T12:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z") + f("2023-07-22T21:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z") + f("2023-07-22T22:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-23T22:00:00Z") + + f("2023-07-22T12:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z") + f("2023-07-22T09:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-22T10:00:00Z") + f("2023-07-22T10:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z") + + f("2023-07-22T12:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z") + f("2023-07-22T14:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z") + f("2023-07-22T15:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-23T15:00:00Z") } func TestStorageOpenClose(t *testing.T) {