diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index 3b64d3212..9d51ec653 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -6,6 +6,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -15,7 +16,8 @@ import ( var ( flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+ - "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage") + "Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+ + "Minimum supported interval is 1 second") maxUnpackedBlockSize = flag.Int("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+ "It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics") ) @@ -55,6 +57,10 @@ func (ps *pendingSeries) Push(tss []prompbmarshal.TimeSeries) { } func (ps *pendingSeries) periodicFlusher() { + flushSeconds := int64(flushInterval.Seconds()) + if flushSeconds <= 0 { + flushSeconds = 1 + } ticker := time.NewTicker(*flushInterval) defer ticker.Stop() mustStop := false @@ -63,7 +69,7 @@ func (ps *pendingSeries) periodicFlusher() { case <-ps.stopCh: mustStop = true case <-ticker.C: - if time.Since(ps.wr.lastFlushTime) < *flushInterval/2 { + if fasttime.UnixTimestamp()-ps.wr.lastFlushTime < uint64(flushSeconds) { continue } } @@ -76,7 +82,7 @@ func (ps *pendingSeries) periodicFlusher() { type writeRequest struct { wr prompbmarshal.WriteRequest pushBlock func(block []byte) - lastFlushTime time.Time + lastFlushTime uint64 tss []prompbmarshal.TimeSeries @@ -108,7 +114,7 @@ func (wr *writeRequest) reset() { func (wr *writeRequest) flush() { wr.wr.Timeseries = wr.tss - wr.lastFlushTime = time.Now() + wr.lastFlushTime = fasttime.UnixTimestamp() pushWriteRequest(&wr.wr, wr.pushBlock) wr.reset() } diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 77fb4ae08..b394eb621 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -99,7 +100,7 @@ func main() { go func() { // init reload metrics with positive values to improve alerting conditions configSuccess.Set(1) - configTimestamp.Set(uint64(time.Now().UnixNano()) / 1e9) + configTimestamp.Set(fasttime.UnixTimestamp()) sigHup := procutil.NewSighupChan() for { <-sigHup @@ -112,7 +113,7 @@ func main() { continue } configSuccess.Set(1) - configTimestamp.Set(uint64(time.Now().UnixNano()) / 1e9) + configTimestamp.Set(fasttime.UnixTimestamp()) logger.Infof("Rules reloaded successfully from %q", *rulePath) } }() diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 13cd8e2a7..5b704fa13 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" @@ -395,14 +396,14 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %s", err) } - date := time.Now().Unix() / secsPerDay + date := fasttime.UnixDate() dateStr := r.FormValue("date") if len(dateStr) > 0 { t, err := time.Parse("2006-01-02", dateStr) if err != nil { return fmt.Errorf("cannot parse `date` arg %q: %s", dateStr, err) } - date = t.Unix() / secsPerDay + date = uint64(t.Unix()) / secsPerDay } topN := 10 topNStr := r.FormValue("topN") @@ -419,7 +420,7 @@ func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque } topN = n } - status, err := netstorage.GetTSDBStatusForDate(deadline, uint64(date), topN) + status, err := netstorage.GetTSDBStatusForDate(deadline, date, topN) if err != nil { return fmt.Errorf(`cannot obtain tsdb status for date=%d, topN=%d: %s`, date, topN, err) } @@ -992,7 +993,7 @@ func getBool(r *http.Request, argKey string) bool { } func currentTime() int64 { - return int64(time.Now().UTC().Unix()) * 1e3 + return int64(fasttime.UnixTimestamp() * 1000) } func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) { diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 32b9070aa..5b41bb05f 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" @@ -64,18 +65,18 @@ func InitRollupResultCache(cachePath string) { stats := &fastcache.Stats{} var statsLock sync.Mutex - var statsLastUpdate time.Time + var statsLastUpdate uint64 fcs := func() *fastcache.Stats { statsLock.Lock() defer statsLock.Unlock() - if time.Since(statsLastUpdate) < time.Second { + if fasttime.UnixTimestamp()-statsLastUpdate < 2 { return stats } var fcs fastcache.Stats c.UpdateStats(&fcs) stats = &fcs - statsLastUpdate = time.Now() + statsLastUpdate = fasttime.UnixTimestamp() return stats } if len(rollupResultCachePath) > 0 { diff --git a/lib/fasttime/fasttime.go b/lib/fasttime/fasttime.go new file mode 100644 index 000000000..f50a27f7f --- /dev/null +++ b/lib/fasttime/fasttime.go @@ -0,0 +1,40 @@ +package fasttime + +import ( + "sync/atomic" + "time" +) + +func init() { + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for tm := range ticker.C { + t := uint64(tm.Unix()) + atomic.StoreUint64(¤tTimestamp, t) + } + }() +} + +var currentTimestamp = uint64(time.Now().Unix()) + +// UnixTimestamp returns the current unix timestamp in seconds. +// +// It is faster than time.Now().Unix() +func UnixTimestamp() uint64 { + return atomic.LoadUint64(¤tTimestamp) +} + +// UnixDate returns date from the current unix timestamp. +// +// The date is calculated by dividing unix timestamp by (24*3600) +func UnixDate() uint64 { + return UnixTimestamp() / (24 * 3600) +} + +// UnixHour returns hour from the current unix timestamp. +// +// The hour is calculated by dividing unix timestamp by 3600 +func UnixHour() uint64 { + return UnixTimestamp() / 3600 +} diff --git a/lib/fasttime/fasttime_test.go b/lib/fasttime/fasttime_test.go new file mode 100644 index 000000000..ad3f2cf94 --- /dev/null +++ b/lib/fasttime/fasttime_test.go @@ -0,0 +1,30 @@ +package fasttime + +import ( + "testing" + "time" +) + +func TestUnixTimestamp(t *testing.T) { + tsExpected := uint64(time.Now().Unix()) + ts := UnixTimestamp() + if ts-tsExpected > 1 { + t.Fatalf("unexpected UnixTimestamp; got %d; want %d", ts, tsExpected) + } +} + +func TestUnixDate(t *testing.T) { + dateExpected := uint64(time.Now().Unix() / (24 * 3600)) + date := UnixDate() + if date-dateExpected > 1 { + t.Fatalf("unexpected UnixDate; got %d; want %d", date, dateExpected) + } +} + +func TestUnixHour(t *testing.T) { + hourExpected := uint64(time.Now().Unix() / 3600) + hour := UnixHour() + if hour-hourExpected > 1 { + t.Fatalf("unexpected UnixHour; got %d; want %d", hour, hourExpected) + } +} diff --git a/lib/fasttime/fasttime_timing_test.go b/lib/fasttime/fasttime_timing_test.go new file mode 100644 index 000000000..bd27085c8 --- /dev/null +++ b/lib/fasttime/fasttime_timing_test.go @@ -0,0 +1,32 @@ +package fasttime + +import ( + "sync/atomic" + "testing" + "time" +) + +func BenchmarkUnixTimestamp(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var ts uint64 + for pb.Next() { + ts += UnixTimestamp() + } + atomic.StoreUint64(&Sink, ts) + }) +} + +func BenchmarkTimeNowUnix(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var ts uint64 + for pb.Next() { + ts += uint64(time.Now().Unix()) + } + atomic.StoreUint64(&Sink, ts) + }) +} + +// Sink should prevent from code elimination by optimizing compiler +var Sink uint64 diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 9482ed6f0..c04a95ffb 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -227,7 +228,7 @@ func (idxbc *indexBlockCache) cleaner() { } func (idxbc *indexBlockCache) cleanByTimeout() { - currentTime := atomic.LoadUint64(¤tTimestamp) + currentTime := fasttime.UnixTimestamp() idxbc.mu.Lock() for k, idxbe := range idxbc.m { // Delete items accessed more than 10 minutes ago. @@ -245,7 +246,7 @@ func (idxbc *indexBlockCache) Get(k uint64) *indexBlock { idxbc.mu.RUnlock() if idxbe != nil { - currentTime := atomic.LoadUint64(¤tTimestamp) + currentTime := fasttime.UnixTimestamp() if atomic.LoadUint64(&idxbe.lastAccessTime) != currentTime { atomic.StoreUint64(&idxbe.lastAccessTime, currentTime) } @@ -276,7 +277,7 @@ func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) { // Store idxb in the cache. idxbe := &indexBlockCacheEntry{ - lastAccessTime: atomic.LoadUint64(¤tTimestamp), + lastAccessTime: fasttime.UnixTimestamp(), idxb: idxb, } idxbc.m[k] = idxbe @@ -374,7 +375,7 @@ func (ibc *inmemoryBlockCache) cleaner() { } func (ibc *inmemoryBlockCache) cleanByTimeout() { - currentTime := atomic.LoadUint64(¤tTimestamp) + currentTime := fasttime.UnixTimestamp() ibc.mu.Lock() for k, ibe := range ibc.m { // Delete items accessed more than 10 minutes ago. @@ -393,7 +394,7 @@ func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock { ibc.mu.RUnlock() if ibe != nil { - currentTime := atomic.LoadUint64(¤tTimestamp) + currentTime := fasttime.UnixTimestamp() if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime { atomic.StoreUint64(&ibe.lastAccessTime, currentTime) } @@ -424,7 +425,7 @@ func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) { // Store ib in the cache. ibe := &inmemoryBlockCacheEntry{ - lastAccessTime: atomic.LoadUint64(¤tTimestamp), + lastAccessTime: fasttime.UnixTimestamp(), ib: ib, } ibc.m[k] = ibe @@ -445,16 +446,3 @@ func (ibc *inmemoryBlockCache) Requests() uint64 { func (ibc *inmemoryBlockCache) Misses() uint64 { return atomic.LoadUint64(&ibc.misses) } - -func init() { - go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for tm := range ticker.C { - t := uint64(tm.Unix()) - atomic.StoreUint64(¤tTimestamp, t) - } - }() -} - -var currentTimestamp uint64 diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 7467943c3..b4fedec40 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -13,6 +13,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -102,7 +103,7 @@ type Table struct { rawItemsBlocks []*inmemoryBlock rawItemsLock sync.Mutex - rawItemsLastFlushTime time.Time + rawItemsLastFlushTime uint64 snapshotLock sync.RWMutex @@ -369,7 +370,7 @@ func (tb *Table) AddItems(items [][]byte) error { if len(tb.rawItemsBlocks) >= 1024 { blocksToMerge = tb.rawItemsBlocks tb.rawItemsBlocks = nil - tb.rawItemsLastFlushTime = time.Now() + tb.rawItemsLastFlushTime = fasttime.UnixTimestamp() } tb.rawItemsLock.Unlock() @@ -508,11 +509,15 @@ func (tb *Table) flushRawItems(isFinal bool) { defer tb.rawItemsPendingFlushesWG.Done() mustFlush := false - currentTime := time.Now() + currentTime := fasttime.UnixTimestamp() + flushSeconds := int64(rawItemsFlushInterval.Seconds()) + if flushSeconds <= 0 { + flushSeconds = 1 + } var blocksToMerge []*inmemoryBlock tb.rawItemsLock.Lock() - if isFinal || currentTime.Sub(tb.rawItemsLastFlushTime) > rawItemsFlushInterval { + if isFinal || currentTime-tb.rawItemsLastFlushTime > uint64(flushSeconds) { mustFlush = true blocksToMerge = tb.rawItemsBlocks tb.rawItemsBlocks = nil @@ -674,7 +679,7 @@ const ( func (tb *Table) partMerger() error { sleepTime := minMergeSleepTime - var lastMergeTime time.Time + var lastMergeTime uint64 isFinal := false t := time.NewTimer(sleepTime) for { @@ -682,7 +687,7 @@ func (tb *Table) partMerger() error { if err == nil { // Try merging additional parts. sleepTime = minMergeSleepTime - lastMergeTime = time.Now() + lastMergeTime = fasttime.UnixTimestamp() isFinal = false continue } @@ -693,10 +698,10 @@ func (tb *Table) partMerger() error { if err != errNothingToMerge { return err } - if time.Since(lastMergeTime) > 30*time.Second { + if fasttime.UnixTimestamp()-lastMergeTime > 30 { // We have free time for merging into bigger parts. // This should improve select performance. - lastMergeTime = time.Now() + lastMergeTime = fasttime.UnixTimestamp() isFinal = true continue } @@ -892,15 +897,15 @@ func (tb *Table) nextMergeIdx() uint64 { var ( maxOutPartItemsLock sync.Mutex - maxOutPartItemsDeadline time.Time + maxOutPartItemsDeadline uint64 lastMaxOutPartItems uint64 ) func (tb *Table) maxOutPartItems() uint64 { maxOutPartItemsLock.Lock() - if time.Until(maxOutPartItemsDeadline) < 0 { + if maxOutPartItemsDeadline < fasttime.UnixTimestamp() { lastMaxOutPartItems = tb.maxOutPartItemsSlow() - maxOutPartItemsDeadline = time.Now().Add(time.Second) + maxOutPartItemsDeadline = fasttime.UnixTimestamp() + 2 } n := lastMaxOutPartItems maxOutPartItemsLock.Unlock() diff --git a/lib/promscrape/discoveryutils/config_map.go b/lib/promscrape/discoveryutils/config_map.go index d16c95261..e82230ebf 100644 --- a/lib/promscrape/discoveryutils/config_map.go +++ b/lib/promscrape/discoveryutils/config_map.go @@ -3,6 +3,8 @@ package discoveryutils import ( "sync" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // ConfigMap is a map for storing discovery api configs. @@ -37,7 +39,7 @@ func (cm *ConfigMap) Get(key interface{}, newConfig func() (interface{}, error)) e := cm.m[key] if e != nil { - e.lastAccessTime = time.Now() + e.lastAccessTime = fasttime.UnixTimestamp() return e.cfg, nil } cfg, err := newConfig() @@ -46,17 +48,18 @@ func (cm *ConfigMap) Get(key interface{}, newConfig func() (interface{}, error)) } cm.m[key] = &configMapEntry{ cfg: cfg, - lastAccessTime: time.Now(), + lastAccessTime: fasttime.UnixTimestamp(), } return cfg, nil } func (cm *ConfigMap) cleaner() { tc := time.NewTicker(15 * time.Minute) - for currentTime := range tc.C { + for range tc.C { + currentTime := fasttime.UnixTimestamp() cm.mu.Lock() for k, e := range cm.m { - if currentTime.Sub(e.lastAccessTime) > 10*time.Minute { + if currentTime-e.lastAccessTime > 10*60 { delete(cm.m, k) } } @@ -66,5 +69,5 @@ func (cm *ConfigMap) cleaner() { type configMapEntry struct { cfg interface{} - lastAccessTime time.Time + lastAccessTime uint64 } diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index afda93c8c..3d6657f24 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -69,7 +70,7 @@ func (ctx *streamContext) Read(r io.Reader) bool { rows := ctx.Rows.Rows // Fill missing timestamps with the current timestamp rounded to seconds. - currentTimestamp := time.Now().Unix() + currentTimestamp := int64(fasttime.UnixTimestamp()) for i := range rows { r := &rows[i] if r.Timestamp == 0 { diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index 031a986df..13c5e1890 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -68,7 +69,7 @@ func (ctx *streamContext) Read(r io.Reader) bool { rows := ctx.Rows.Rows // Fill in missing timestamps - currentTimestamp := time.Now().Unix() + currentTimestamp := int64(fasttime.UnixTimestamp()) for i := range rows { r := &rows[i] if r.Timestamp == 0 { diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index 28a842c5c..1ad6da3c9 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -67,7 +68,7 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { rows := ctx.Rows.Rows // Fill in missing timestamps - currentTimestamp := time.Now().Unix() + currentTimestamp := int64(fasttime.UnixTimestamp()) for i := range rows { r := &rows[i] if r.Timestamp == 0 { diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 5c1552c7d..b977fa3f0 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -1233,7 +1234,7 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) { } func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) { - minDate := uint64(timestampFromTime(time.Now())) / msecPerDay + minDate := fasttime.UnixDate() kb := &is.kb ts := &is.ts kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs) diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index da8d78e56..788e3f182 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -2,9 +2,9 @@ package storage import ( "sync" - "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -17,7 +17,7 @@ type inmemoryPart struct { indexData bytesutil.ByteBuffer metaindexData bytesutil.ByteBuffer - creationTime time.Time + creationTime uint64 } // Reset resets mp. @@ -29,7 +29,7 @@ func (mp *inmemoryPart) Reset() { mp.indexData.Reset() mp.metaindexData.Reset() - mp.creationTime = time.Time{} + mp.creationTime = 0 } // InitFromRows initializes mp from the given rows. @@ -42,7 +42,7 @@ func (mp *inmemoryPart) InitFromRows(rows []rawRow) { rrm := getRawRowsMarshaler() rrm.marshalToInmemoryPart(mp, rows) putRawRowsMarshaler(rrm) - mp.creationTime = time.Now() + mp.creationTime = fasttime.UnixTimestamp() } // NewPart creates new part from mp. diff --git a/lib/storage/part.go b/lib/storage/part.go index 01d5a1a05..9ab84c263 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -228,7 +229,7 @@ func (ibc *indexBlockCache) cleaner() { } func (ibc *indexBlockCache) cleanByTimeout() { - currentTime := atomic.LoadUint64(¤tTimestamp) + currentTime := fasttime.UnixTimestamp() ibc.mu.Lock() for k, ibe := range ibc.m { // Delete items accessed more than 10 minutes ago. @@ -247,7 +248,7 @@ func (ibc *indexBlockCache) Get(k uint64) *indexBlock { ibc.mu.RUnlock() if ibe != nil { - currentTime := atomic.LoadUint64(¤tTimestamp) + currentTime := fasttime.UnixTimestamp() if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime { atomic.StoreUint64(&ibe.lastAccessTime, currentTime) } @@ -276,7 +277,7 @@ func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) { // Store frequently requested ib in the cache. ibe := &indexBlockCacheEntry{ - lastAccessTime: atomic.LoadUint64(¤tTimestamp), + lastAccessTime: fasttime.UnixTimestamp(), ib: ib, } ibc.m[k] = ibe @@ -297,16 +298,3 @@ func (ibc *indexBlockCache) Len() uint64 { ibc.mu.Unlock() return n } - -func init() { - go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for tm := range ticker.C { - t := uint64(tm.Unix()) - atomic.StoreUint64(¤tTimestamp, t) - } - }() -} - -var currentTimestamp uint64 diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 7555fe4e7..cb495fbf1 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -445,7 +446,7 @@ func (rrs *rawRowsShards) Len() int { type rawRowsShard struct { lock sync.Mutex rows []rawRow - lastFlushTime time.Time + lastFlushTime uint64 } func (rrs *rawRowsShard) Len() int { @@ -478,7 +479,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { rr := getRawRowsMaxSize() rrs.rows, rr.rows = rr.rows, rrs.rows rrss = append(rrss, rr) - rrs.lastFlushTime = time.Now() + rrs.lastFlushTime = fasttime.UnixTimestamp() } rrs.lock.Unlock() @@ -722,10 +723,14 @@ func (rrs *rawRowsShards) flush(pt *partition, isFinal bool) { func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) { var rr *rawRows - currentTime := time.Now() + currentTime := fasttime.UnixTimestamp() + flushSeconds := int64(rawRowsFlushInterval.Seconds()) + if flushSeconds <= 0 { + flushSeconds = 1 + } rrs.lock.Lock() - if isFinal || currentTime.Sub(rrs.lastFlushTime) > rawRowsFlushInterval { + if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) { rr = getRawRowsMaxSize() rrs.rows, rr.rows = rr.rows, rrs.rows } @@ -764,7 +769,11 @@ func (pt *partition) inmemoryPartsFlusher() { } func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*partWrapper, error) { - currentTime := time.Now() + currentTime := fasttime.UnixTimestamp() + flushSeconds := int64(inmemoryPartsFlushInterval.Seconds()) + if flushSeconds <= 0 { + flushSeconds = 1 + } // Inmemory parts may present only in small parts. pt.partsLock.Lock() @@ -772,7 +781,7 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p if pw.mp == nil || pw.isInMerge { continue } - if force || currentTime.Sub(pw.mp.creationTime) >= inmemoryPartsFlushInterval { + if force || currentTime-pw.mp.creationTime >= uint64(flushSeconds) { pw.isInMerge = true dstPws = append(dstPws, pw) } @@ -876,7 +885,7 @@ const ( func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error { sleepTime := minMergeSleepTime - var lastMergeTime time.Time + var lastMergeTime uint64 isFinal := false t := time.NewTimer(sleepTime) for { @@ -884,7 +893,7 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error { if err == nil { // Try merging additional parts. sleepTime = minMergeSleepTime - lastMergeTime = time.Now() + lastMergeTime = fasttime.UnixTimestamp() isFinal = false continue } @@ -895,10 +904,10 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error { if err != errNothingToMerge { return err } - if time.Since(lastMergeTime) > 30*time.Second { + if fasttime.UnixTimestamp()-lastMergeTime > 30 { // We have free time for merging into bigger parts. // This should improve select performance. - lastMergeTime = time.Now() + lastMergeTime = fasttime.UnixTimestamp() isFinal = true continue } @@ -939,7 +948,7 @@ func mustGetFreeDiskSpace(path string) uint64 { defer freeSpaceMapLock.Unlock() e, ok := freeSpaceMap[path] - if ok && time.Since(e.updateTime) < time.Second { + if ok && fasttime.UnixTimestamp()-e.updateTime < 2 { // Fast path - the entry is fresh. return e.freeSpace } @@ -947,7 +956,7 @@ func mustGetFreeDiskSpace(path string) uint64 { // Slow path. // Determine the amount of free space on bigPartsPath. e.freeSpace = fs.MustGetFreeSpace(path) - e.updateTime = time.Now() + e.updateTime = fasttime.UnixTimestamp() freeSpaceMap[path] = e return e.freeSpace } @@ -958,7 +967,7 @@ var ( ) type freeSpaceEntry struct { - updateTime time.Time + updateTime uint64 freeSpace uint64 } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index cec1769fd..32f743879 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -17,6 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -142,14 +143,14 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8) s.dateMetricIDCache = newDateMetricIDCache() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids") hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) s.pendingHourEntries = &uint64set.Set{} - date := uint64(timestampFromTime(time.Now())) / msecPerDay + date := fasttime.UnixDate() nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date) s.nextDayMetricIDs.Store(nextDayMetricIDs) s.pendingNextDayMetricIDs = &uint64set.Set{} @@ -1138,7 +1139,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { idb := s.idb() hm := s.currHourMetricIDs.Load().(*hourMetricIDs) nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v - todayShare16bit := uint64((float64(uint64(time.Now().UnixNano()/1e9)%(3600*24)) / (3600 * 24)) * (1 << 16)) + todayShare16bit := uint64((float64(fasttime.UnixTimestamp()%(3600*24)) / (3600 * 24)) * (1 << 16)) for i := range rows { r := &rows[i] if r.Timestamp != prevTimestamp { @@ -1213,7 +1214,7 @@ type dateMetricIDCache struct { // Contains mutable map protected by mu byDateMutable *byDateMetricIDMap - lastSyncTime time.Time + lastSyncTime uint64 mu sync.Mutex } @@ -1228,7 +1229,7 @@ func (dmc *dateMetricIDCache) Reset() { // Do not reset syncsCount and resetsCount dmc.byDate.Store(newByDateMetricIDMap()) dmc.byDateMutable = newByDateMetricIDMap() - dmc.lastSyncTime = time.Now() + dmc.lastSyncTime = fasttime.UnixTimestamp() dmc.mu.Unlock() atomic.AddUint64(&dmc.resetsCount, 1) @@ -1262,13 +1263,12 @@ func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { } // Slow path. Check mutable map. - currentTime := time.Now() - + currentTime := fasttime.UnixTimestamp() dmc.mu.Lock() v = dmc.byDateMutable.get(date) ok := v.Has(metricID) mustSync := false - if currentTime.Sub(dmc.lastSyncTime) > 10*time.Second { + if currentTime-dmc.lastSyncTime > 10 { mustSync = true dmc.lastSyncTime = currentTime } @@ -1351,8 +1351,7 @@ type byDateMetricIDEntry struct { } func (s *Storage) updateNextDayMetricIDs() { - date := uint64(timestampFromTime(time.Now())) / msecPerDay - + date := fasttime.UnixDate() e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) s.pendingNextDayMetricIDsLock.Lock() pendingMetricIDs := s.pendingNextDayMetricIDs @@ -1380,7 +1379,7 @@ func (s *Storage) updateCurrHourMetricIDs() { newMetricIDs := s.pendingHourEntries s.pendingHourEntries = &uint64set.Set{} s.pendingHourEntriesLock.Unlock() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() if newMetricIDs.Len() == 0 && hm.hour == hour { // Fast path: nothing to update. return diff --git a/lib/storage/table.go b/lib/storage/table.go index 19e3035c9..7cdd54ae2 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" @@ -353,7 +354,7 @@ func (tb *table) AddRows(rows []rawRow) error { } func (tb *table) getMinMaxTimestamps() (int64, int64) { - now := timestampFromTime(time.Now()) + now := int64(fasttime.UnixTimestamp() * 1000) minTimestamp := now - tb.retentionMilliseconds maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) if minTimestamp < 0 { @@ -384,7 +385,7 @@ func (tb *table) retentionWatcher() { case <-ticker.C: } - minTimestamp := timestampFromTime(time.Now()) - tb.retentionMilliseconds + minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMilliseconds var ptwsDrop []*partitionWrapper tb.ptwsLock.Lock() dst := tb.ptws[:0] diff --git a/lib/storage/table_search.go b/lib/storage/table_search.go index b82ff9011..113c8053c 100644 --- a/lib/storage/table_search.go +++ b/lib/storage/table_search.go @@ -4,8 +4,8 @@ import ( "container/heap" "fmt" "io" - "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -65,7 +65,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { // Adjust tr.MinTimestamp, so it doesn't obtain data older // than the tb retention. - now := timestampFromTime(time.Now()) + now := int64(fasttime.UnixTimestamp() * 1000) minTimestamp := now - tb.retentionMilliseconds if tr.MinTimestamp < minTimestamp { tr.MinTimestamp = minTimestamp