mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
lib/storage: use specialized cache for (date, metricID) entries
This improves ingestion performance.
This commit is contained in:
parent
9a43902bd8
commit
dea2f3efed
3 changed files with 139 additions and 37 deletions
|
@ -414,9 +414,6 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 {
|
||||||
return float64(m().MetricNameCacheSizeBytes)
|
return float64(m().MetricNameCacheSizeBytes)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 {
|
|
||||||
return float64(m().DateMetricIDCacheSizeBytes)
|
|
||||||
})
|
|
||||||
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
|
||||||
return float64(idbm().TagCacheSizeBytes)
|
return float64(idbm().TagCacheSizeBytes)
|
||||||
})
|
})
|
||||||
|
@ -433,9 +430,6 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
metrics.NewGauge(`vm_cache_requests_total{type="storage/metricName"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_requests_total{type="storage/metricName"}`, func() float64 {
|
||||||
return float64(m().MetricNameCacheRequests)
|
return float64(m().MetricNameCacheRequests)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_requests_total{type="storage/date_metricID"}`, func() float64 {
|
|
||||||
return float64(m().DateMetricIDCacheRequests)
|
|
||||||
})
|
|
||||||
metrics.NewGauge(`vm_cache_requests_total{type="storage/bigIndexBlocks"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_requests_total{type="storage/bigIndexBlocks"}`, func() float64 {
|
||||||
return float64(tm().BigIndexBlocksCacheRequests)
|
return float64(tm().BigIndexBlocksCacheRequests)
|
||||||
})
|
})
|
||||||
|
@ -467,9 +461,6 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
metrics.NewGauge(`vm_cache_misses_total{type="storage/metricName"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_misses_total{type="storage/metricName"}`, func() float64 {
|
||||||
return float64(m().MetricNameCacheMisses)
|
return float64(m().MetricNameCacheMisses)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_misses_total{type="storage/date_metricID"}`, func() float64 {
|
|
||||||
return float64(m().DateMetricIDCacheMisses)
|
|
||||||
})
|
|
||||||
metrics.NewGauge(`vm_cache_misses_total{type="storage/bigIndexBlocks"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_misses_total{type="storage/bigIndexBlocks"}`, func() float64 {
|
||||||
return float64(tm().BigIndexBlocksCacheMisses)
|
return float64(tm().BigIndexBlocksCacheMisses)
|
||||||
})
|
})
|
||||||
|
@ -502,7 +493,4 @@ func registerStorageMetrics(strg *storage.Storage) {
|
||||||
metrics.NewGauge(`vm_cache_collisions_total{type="storage/metricName"}`, func() float64 {
|
metrics.NewGauge(`vm_cache_collisions_total{type="storage/metricName"}`, func() float64 {
|
||||||
return float64(m().MetricNameCacheCollisions)
|
return float64(m().MetricNameCacheCollisions)
|
||||||
})
|
})
|
||||||
metrics.NewGauge(`vm_cache_collisions_total{type="storage/date_metricID"}`, func() float64 {
|
|
||||||
return float64(m().DateMetricIDCacheCollisions)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -905,7 +905,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||||
// Obtain metricIDs to delete.
|
// Obtain metricIDs to delete.
|
||||||
tr := TimeRange{
|
tr := TimeRange{
|
||||||
MinTimestamp: 0,
|
MinTimestamp: 0,
|
||||||
MaxTimestamp: (1<<63)-1,
|
MaxTimestamp: (1 << 63) - 1,
|
||||||
}
|
}
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch()
|
||||||
metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12)
|
metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12)
|
||||||
|
|
|
@ -59,7 +59,7 @@ type Storage struct {
|
||||||
metricNameCache *workingsetcache.Cache
|
metricNameCache *workingsetcache.Cache
|
||||||
|
|
||||||
// dateMetricIDCache is (Date, MetricID) cache.
|
// dateMetricIDCache is (Date, MetricID) cache.
|
||||||
dateMetricIDCache *workingsetcache.Cache
|
dateMetricIDCache *dateMetricIDCache
|
||||||
|
|
||||||
// Fast cache for MetricID values occured during the current hour.
|
// Fast cache for MetricID values occured during the current hour.
|
||||||
currHourMetricIDs atomic.Value
|
currHourMetricIDs atomic.Value
|
||||||
|
@ -129,7 +129,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
||||||
s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3)
|
s.tsidCache = s.mustLoadCache("MetricName->TSID", "metricName_tsid", mem/3)
|
||||||
s.metricIDCache = s.mustLoadCache("MetricID->TSID", "metricID_tsid", mem/16)
|
s.metricIDCache = s.mustLoadCache("MetricID->TSID", "metricID_tsid", mem/16)
|
||||||
s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8)
|
s.metricNameCache = s.mustLoadCache("MetricID->MetricName", "metricID_metricName", mem/8)
|
||||||
s.dateMetricIDCache = s.mustLoadCache("Date->MetricID", "date_metricID", mem/32)
|
s.dateMetricIDCache = newDateMetricIDCache()
|
||||||
|
|
||||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||||
hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids")
|
hmCurr := s.mustLoadHourMetricIDs(hour, "curr_hour_metric_ids")
|
||||||
|
@ -322,11 +322,7 @@ type Metrics struct {
|
||||||
MetricNameCacheMisses uint64
|
MetricNameCacheMisses uint64
|
||||||
MetricNameCacheCollisions uint64
|
MetricNameCacheCollisions uint64
|
||||||
|
|
||||||
DateMetricIDCacheSize uint64
|
DateMetricIDCacheSize uint64
|
||||||
DateMetricIDCacheSizeBytes uint64
|
|
||||||
DateMetricIDCacheRequests uint64
|
|
||||||
DateMetricIDCacheMisses uint64
|
|
||||||
DateMetricIDCacheCollisions uint64
|
|
||||||
|
|
||||||
HourMetricIDCacheSize uint64
|
HourMetricIDCacheSize uint64
|
||||||
|
|
||||||
|
@ -378,13 +374,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
||||||
m.MetricNameCacheMisses += cs.Misses
|
m.MetricNameCacheMisses += cs.Misses
|
||||||
m.MetricNameCacheCollisions += cs.Collisions
|
m.MetricNameCacheCollisions += cs.Collisions
|
||||||
|
|
||||||
cs.Reset()
|
m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount())
|
||||||
s.dateMetricIDCache.UpdateStats(&cs)
|
|
||||||
m.DateMetricIDCacheSize += cs.EntriesCount
|
|
||||||
m.DateMetricIDCacheSizeBytes += cs.BytesSize
|
|
||||||
m.DateMetricIDCacheRequests += cs.GetCalls
|
|
||||||
m.DateMetricIDCacheMisses += cs.Misses
|
|
||||||
m.DateMetricIDCacheCollisions += cs.Collisions
|
|
||||||
|
|
||||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||||
|
@ -498,7 +488,6 @@ func (s *Storage) MustClose() {
|
||||||
s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
|
s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
|
||||||
s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
|
s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
|
||||||
s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
|
s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
|
||||||
s.mustSaveAndStopCache(s.dateMetricIDCache, "Date->MetricID", "date_metricID")
|
|
||||||
|
|
||||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||||
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
|
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
|
||||||
|
@ -973,11 +962,6 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
|
||||||
var date uint64
|
var date uint64
|
||||||
var hour uint64
|
var hour uint64
|
||||||
var prevTimestamp int64
|
var prevTimestamp int64
|
||||||
kb := kbPool.Get()
|
|
||||||
defer kbPool.Put(kb)
|
|
||||||
kb.B = bytesutil.Resize(kb.B, 16)
|
|
||||||
keyBuf := kb.B
|
|
||||||
a := (*[2]uint64)(unsafe.Pointer(&keyBuf[0]))
|
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||||
for i := range rows {
|
for i := range rows {
|
||||||
|
@ -1006,16 +990,14 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slower path: check global cache for (date, metricID) entry.
|
// Slower path: check global cache for (date, metricID) entry.
|
||||||
a[0] = date
|
if s.dateMetricIDCache.Has(date, metricID) {
|
||||||
a[1] = metricID
|
|
||||||
if s.dateMetricIDCache.Has(keyBuf) {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path: store the entry in the (date, metricID) cache and in the indexDB.
|
// Slow path: store the entry in the (date, metricID) cache and in the indexDB.
|
||||||
// It is OK if the (date, metricID) entry is added multiple times to db
|
// It is OK if the (date, metricID) entry is added multiple times to db
|
||||||
// by concurrent goroutines.
|
// by concurrent goroutines.
|
||||||
s.dateMetricIDCache.Set(keyBuf, nil)
|
s.dateMetricIDCache.Set(date, metricID)
|
||||||
if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil {
|
if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil {
|
||||||
lastError = err
|
lastError = err
|
||||||
continue
|
continue
|
||||||
|
@ -1024,6 +1006,138 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error
|
||||||
return lastError
|
return lastError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
|
||||||
|
//
|
||||||
|
// It should be faster than map[date]*uint64set.Set on multicore systems.
|
||||||
|
type dateMetricIDCache struct {
|
||||||
|
// Contains immutable map
|
||||||
|
byDate atomic.Value
|
||||||
|
|
||||||
|
// Contains mutable map protected by mu
|
||||||
|
byDateMutable *byDateMetricIDMap
|
||||||
|
lastSyncTime time.Time
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDateMetricIDCache() *dateMetricIDCache {
|
||||||
|
var dmc dateMetricIDCache
|
||||||
|
dmc.Reset()
|
||||||
|
return &dmc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dmc *dateMetricIDCache) Reset() {
|
||||||
|
dmc.mu.Lock()
|
||||||
|
dmc.byDate.Store(newByDateMetricIDMap())
|
||||||
|
dmc.byDateMutable = newByDateMetricIDMap()
|
||||||
|
dmc.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dmc *dateMetricIDCache) EntriesCount() int {
|
||||||
|
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||||
|
n := 0
|
||||||
|
for _, e := range byDate.m {
|
||||||
|
n += e.v.Len()
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
|
||||||
|
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||||
|
v := byDate.get(date)
|
||||||
|
if v.Has(metricID) {
|
||||||
|
// Fast path.
|
||||||
|
// The majority of calls must go here.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path. Check mutable map.
|
||||||
|
currentTime := time.Now()
|
||||||
|
|
||||||
|
dmc.mu.RLock()
|
||||||
|
v = dmc.byDateMutable.get(date)
|
||||||
|
ok := v.Has(metricID)
|
||||||
|
mustSync := false
|
||||||
|
if currentTime.Sub(dmc.lastSyncTime) > 10*time.Second {
|
||||||
|
mustSync = true
|
||||||
|
dmc.lastSyncTime = currentTime
|
||||||
|
}
|
||||||
|
dmc.mu.RUnlock()
|
||||||
|
|
||||||
|
if mustSync {
|
||||||
|
dmc.sync()
|
||||||
|
}
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dmc *dateMetricIDCache) Set(date, metricID uint64) {
|
||||||
|
dmc.mu.Lock()
|
||||||
|
v := dmc.byDateMutable.getOrCreate(date)
|
||||||
|
v.Add(metricID)
|
||||||
|
dmc.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dmc *dateMetricIDCache) sync() {
|
||||||
|
dmc.mu.Lock()
|
||||||
|
byDate := dmc.byDate.Load().(*byDateMetricIDMap)
|
||||||
|
for date, e := range dmc.byDateMutable.m {
|
||||||
|
v := byDate.get(date)
|
||||||
|
e.v.Union(v)
|
||||||
|
}
|
||||||
|
dmc.byDate.Store(dmc.byDateMutable)
|
||||||
|
byDateMutable := newByDateMetricIDMap()
|
||||||
|
dmc.byDateMutable = byDateMutable
|
||||||
|
dmc.mu.Unlock()
|
||||||
|
|
||||||
|
if dmc.EntriesCount() > memory.Allowed()/128 {
|
||||||
|
dmc.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type byDateMetricIDMap struct {
|
||||||
|
hotEntry atomic.Value
|
||||||
|
m map[uint64]*byDateMetricIDEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
func newByDateMetricIDMap() *byDateMetricIDMap {
|
||||||
|
dmm := &byDateMetricIDMap{
|
||||||
|
m: make(map[uint64]*byDateMetricIDEntry),
|
||||||
|
}
|
||||||
|
dmm.hotEntry.Store(&byDateMetricIDEntry{})
|
||||||
|
return dmm
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set {
|
||||||
|
hotEntry := dmm.hotEntry.Load().(*byDateMetricIDEntry)
|
||||||
|
if hotEntry.date == date {
|
||||||
|
// Fast path
|
||||||
|
return &hotEntry.v
|
||||||
|
}
|
||||||
|
// Slow path
|
||||||
|
e := dmm.m[date]
|
||||||
|
if e == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
dmm.hotEntry.Store(e)
|
||||||
|
return &e.v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set {
|
||||||
|
v := dmm.get(date)
|
||||||
|
if v != nil {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
e := &byDateMetricIDEntry{
|
||||||
|
date: date,
|
||||||
|
}
|
||||||
|
dmm.m[date] = e
|
||||||
|
return &e.v
|
||||||
|
}
|
||||||
|
|
||||||
|
type byDateMetricIDEntry struct {
|
||||||
|
date uint64
|
||||||
|
v uint64set.Set
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Storage) updateCurrHourMetricIDs() {
|
func (s *Storage) updateCurrHourMetricIDs() {
|
||||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||||
s.pendingHourEntriesLock.Lock()
|
s.pendingHourEntriesLock.Lock()
|
||||||
|
|
Loading…
Reference in a new issue