diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index da4eaba6ce..ba3c0365c3 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -414,9 +414,6 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheSizeBytes) }) - metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheSizeBytes) - }) metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheSizeBytes) }) @@ -433,9 +430,6 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_requests_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheRequests) }) - metrics.NewGauge(`vm_cache_requests_total{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheRequests) - }) metrics.NewGauge(`vm_cache_requests_total{type="storage/bigIndexBlocks"}`, func() float64 { return float64(tm().BigIndexBlocksCacheRequests) }) @@ -467,9 +461,6 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_misses_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheMisses) }) - metrics.NewGauge(`vm_cache_misses_total{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheMisses) - }) metrics.NewGauge(`vm_cache_misses_total{type="storage/bigIndexBlocks"}`, func() float64 { return float64(tm().BigIndexBlocksCacheMisses) }) @@ -502,7 +493,4 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_collisions_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheCollisions) }) - metrics.NewGauge(`vm_cache_collisions_total{type="storage/date_metricID"}`, func() float64 { - return float64(m().DateMetricIDCacheCollisions) - }) } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index ea8f91de8c..87dee70953 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -905,7 +905,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { // Obtain metricIDs to delete. tr := TimeRange{ MinTimestamp: 0, - MaxTimestamp: (1<<63)-1, + MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch() metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 28e7b63bed..877a35675b 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -59,7 +59,7 @@ type Storage struct { metricNameCache *workingsetcache.Cache // dateMetricIDCache is (Date, MetricID) cache. - dateMetricIDCache *workingsetcache.Cache + dateMetricIDCache *dateMetricIDCache // Fast cache for MetricID values occured during the current hour. currHourMetricIDs atomic.Value @@ -129,7 +129,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3) s.metricIDCache = s.mustLoadCache("MetricID->TSID", "metricID_tsid", mem/16) s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8) - s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32) + s.dateMetricIDCache = newDateMetricIDCache() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids") @@ -322,11 +322,7 @@ type Metrics struct { MetricNameCacheMisses uint64 MetricNameCacheCollisions uint64 - DateMetricIDCacheSize uint64 - DateMetricIDCacheSizeBytes uint64 - DateMetricIDCacheRequests uint64 - DateMetricIDCacheMisses uint64 - DateMetricIDCacheCollisions uint64 + DateMetricIDCacheSize uint64 HourMetricIDCacheSize uint64 @@ -378,13 +374,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.MetricNameCacheMisses += cs.Misses m.MetricNameCacheCollisions += cs.Collisions - cs.Reset() - s.dateMetricIDCache.UpdateStats(&cs) - m.DateMetricIDCacheSize += cs.EntriesCount - m.DateMetricIDCacheSizeBytes += cs.BytesSize - m.DateMetricIDCacheRequests += cs.GetCalls - m.DateMetricIDCacheMisses += cs.Misses - m.DateMetricIDCacheCollisions += cs.Collisions + m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) @@ -498,7 +488,6 @@ func (s *Storage) MustClose() { s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid") s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid") s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") - s.mustSaveAndStopCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID") hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids") @@ -973,11 +962,6 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error var date uint64 var hour uint64 var prevTimestamp int64 - kb := kbPool.Get() - defer kbPool.Put(kb) - kb.B = bytesutil.Resize(kb.B, 16) - keyBuf := kb.B - a := (*[2]uint64)(unsafe.Pointer(&keyBuf[0])) idb := s.idb() hm := s.currHourMetricIDs.Load().(*hourMetricIDs) for i := range rows { @@ -1006,16 +990,14 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error } // Slower path: check global cache for (date, metricID) entry. - a[0] = date - a[1] = metricID - if s.dateMetricIDCache.Has(keyBuf) { + if s.dateMetricIDCache.Has(date, metricID) { continue } // Slow path: store the entry in the (date, metricID) cache and in the indexDB. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. - s.dateMetricIDCache.Set(keyBuf, nil) + s.dateMetricIDCache.Set(date, metricID) if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil { lastError = err continue @@ -1024,6 +1006,138 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error return lastError } +// dateMetricIDCache is fast cache for holding (date, metricID) entries. +// +// It should be faster than map[date]*uint64set.Set on multicore systems. +type dateMetricIDCache struct { + // Contains immutable map + byDate atomic.Value + + // Contains mutable map protected by mu + byDateMutable *byDateMetricIDMap + lastSyncTime time.Time + mu sync.RWMutex +} + +func newDateMetricIDCache() *dateMetricIDCache { + var dmc dateMetricIDCache + dmc.Reset() + return &dmc +} + +func (dmc *dateMetricIDCache) Reset() { + dmc.mu.Lock() + dmc.byDate.Store(newByDateMetricIDMap()) + dmc.byDateMutable = newByDateMetricIDMap() + dmc.mu.Unlock() +} + +func (dmc *dateMetricIDCache) EntriesCount() int { + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + n := 0 + for _, e := range byDate.m { + n += e.v.Len() + } + return n +} + +func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + v := byDate.get(date) + if v.Has(metricID) { + // Fast path. + // The majority of calls must go here. + return true + } + + // Slow path. Check mutable map. + currentTime := time.Now() + + dmc.mu.RLock() + v = dmc.byDateMutable.get(date) + ok := v.Has(metricID) + mustSync := false + if currentTime.Sub(dmc.lastSyncTime) > 10*time.Second { + mustSync = true + dmc.lastSyncTime = currentTime + } + dmc.mu.RUnlock() + + if mustSync { + dmc.sync() + } + return ok +} + +func (dmc *dateMetricIDCache) Set(date, metricID uint64) { + dmc.mu.Lock() + v := dmc.byDateMutable.getOrCreate(date) + v.Add(metricID) + dmc.mu.Unlock() +} + +func (dmc *dateMetricIDCache) sync() { + dmc.mu.Lock() + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + for date, e := range dmc.byDateMutable.m { + v := byDate.get(date) + e.v.Union(v) + } + dmc.byDate.Store(dmc.byDateMutable) + byDateMutable := newByDateMetricIDMap() + dmc.byDateMutable = byDateMutable + dmc.mu.Unlock() + + if dmc.EntriesCount() > memory.Allowed()/128 { + dmc.Reset() + } +} + +type byDateMetricIDMap struct { + hotEntry atomic.Value + m map[uint64]*byDateMetricIDEntry +} + +func newByDateMetricIDMap() *byDateMetricIDMap { + dmm := &byDateMetricIDMap{ + m: make(map[uint64]*byDateMetricIDEntry), + } + dmm.hotEntry.Store(&byDateMetricIDEntry{}) + return dmm +} + +func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set { + hotEntry := dmm.hotEntry.Load().(*byDateMetricIDEntry) + if hotEntry.date == date { + // Fast path + return &hotEntry.v + } + // Slow path + e := dmm.m[date] + if e == nil { + return nil + } + dmm.hotEntry.Store(e) + return &e.v +} + +func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set { + v := dmm.get(date) + if v != nil { + return v + } + e := &byDateMetricIDEntry{ + date: date, + } + dmm.m[date] = e + return &e.v +} + +type byDateMetricIDEntry struct { + date uint64 + v uint64set.Set +} + func (s *Storage) updateCurrHourMetricIDs() { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourEntriesLock.Lock()