lib/storage: speed up checking metricID existence in the list for the current date

This commit is contained in:
Aliaksandr Valialkin 2019-06-02 18:34:08 +03:00
parent c7280ba61a
commit 4794f894a4

View file

@ -50,8 +50,15 @@ type Storage struct {
// dateMetricIDCache is (Date, MetricID) cache.
dateMetricIDCache *fastcache.Cache
stop chan struct{}
retentionWatcherWG sync.WaitGroup
// Fast cache for today MetricID values.
todayMetricIDs atomic.Value
pendingTodayMetricIDsLock sync.Mutex
pendingTodayMetricIDs map[uint64]struct{}
stop chan struct{}
todayMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
}
// OpenStorage opens storage on the given path with the given number of retention months.
@ -122,6 +129,10 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
}
s.tb = tb
s.todayMetricIDs.Store(&todayMetricIDs{})
s.pendingTodayMetricIDs = make(map[uint64]struct{})
s.startTodayMetricIDsUpdater()
s.startRetentionWatcher()
return s, nil
@ -341,6 +352,27 @@ func (s *Storage) retentionWatcher() {
}
}
func (s *Storage) startTodayMetricIDsUpdater() {
s.todayMetricIDsUpdaterWG.Add(1)
go func() {
s.todayMetricIDsUpdater()
s.todayMetricIDsUpdaterWG.Done()
}()
}
func (s *Storage) todayMetricIDsUpdater() {
t := time.NewTimer(time.Second)
for {
select {
case <-s.stop:
return
case <-t.C:
s.updateTodayMetricIDs()
t.Reset(time.Second)
}
}
}
func (s *Storage) mustRotateIndexDB() {
// Create new indexdb table.
newTableName := nextIndexDBTableName()
@ -379,6 +411,7 @@ func (s *Storage) MustClose() {
close(s.stop)
s.retentionWatcherWG.Wait()
s.todayMetricIDsUpdaterWG.Wait()
s.tb.MustClose()
s.idb().MustClose()
@ -679,14 +712,25 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error
prevTimestamp = r.Timestamp
}
metricID := r.TSID.MetricID
tm := s.todayMetricIDs.Load().(*todayMetricIDs)
if date == tm.date {
if _, ok := tm.m[metricID]; ok {
// Fast path: the (date, metricID) entry is in the fast cache with today's metric ids.
continue
}
s.pendingTodayMetricIDsLock.Lock()
s.pendingTodayMetricIDs[metricID] = struct{}{}
s.pendingTodayMetricIDsLock.Unlock()
}
// Slower path: check global cache for (date, metricID) entry.
a[0] = date
a[1] = metricID
if s.dateMetricIDCache.Has(keyBuf) {
// Fast path: the (date, metricID) entry is in the cache.
continue
}
// Slow path: store the entry in the cache and in the indexDB.
// Slow path: store the entry in the (date, metricID) cache and in the indexDB.
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
s.dateMetricIDCache.Set(keyBuf, nil)
@ -698,6 +742,53 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error
return errors
}
func (s *Storage) updateTodayMetricIDs() {
tm := s.todayMetricIDs.Load().(*todayMetricIDs)
s.pendingTodayMetricIDsLock.Lock()
newMetricIDsLen := len(s.pendingTodayMetricIDs)
s.pendingTodayMetricIDsLock.Unlock()
today := uint64(timestampFromTime(time.Now())) / msecPerDay
if newMetricIDsLen == 0 {
if tm.date == today {
// Fast path: nothing to update.
return
}
// Reset tm because of new day.
tmNew := &todayMetricIDs{
m: make(map[uint64]struct{}, len(tm.m)),
date: today,
}
s.todayMetricIDs.Store(tmNew)
return
}
// Slow path tm.m must be updated with non-empty s.pendingTodayMetricIDs.
m := make(map[uint64]struct{}, len(tm.m)+newMetricIDsLen)
if tm.date == today {
for metricID := range tm.m {
m[metricID] = struct{}{}
}
}
s.pendingTodayMetricIDsLock.Lock()
for metricID := range s.pendingTodayMetricIDs {
m[metricID] = struct{}{}
}
s.pendingTodayMetricIDs = make(map[uint64]struct{}, len(s.pendingTodayMetricIDs))
s.pendingTodayMetricIDsLock.Unlock()
tmNew := &todayMetricIDs{
m: m,
date: today,
}
s.todayMetricIDs.Store(tmNew)
}
type todayMetricIDs struct {
m map[uint64]struct{}
date uint64
}
func (s *Storage) getTSIDFromCache(dst *TSID, metricName []byte) bool {
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))[:]
buf = s.tsidCache.Get(buf[:0], metricName)