mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: tune the logic for pre-populating of the per-day inverted index for the next day
- Postpone the pre-poulation to the last hour of the current day. This should reduce the number of useless entries in the next per-day index, which shouldn't be created there, when the corresponding time series are stopped to be pushed during the current day. - Make the pre-population more smooth in time by using the hash of MetricID instead of MetricID itself when calculating the need for for the given MetricID pre-population. - Sync the logic for pre-population of the next day inverted index with the logic of pre-populating tsid cache after indexdb rotation. This should improve code maintainability. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
This commit is contained in:
parent
b1f94f7f0e
commit
96dce63dbd
2 changed files with 28 additions and 15 deletions
|
@ -370,12 +370,13 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
|
|||
//
|
||||
// It returns true if new index entry was created, and false if it was skipped.
|
||||
func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, error) {
|
||||
h := xxhash.Sum64(metricNameRaw)
|
||||
p := float64(uint32(h)) / (1 << 32)
|
||||
pMin := float64(fasttime.UnixTimestamp()-db.rotationTimestamp) / 3600
|
||||
if p > pMin {
|
||||
// Fast path: there is no need creating indexes for metricNameRaw yet.
|
||||
return false, nil
|
||||
if pMin < 1 {
|
||||
p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
|
||||
if p > pMin {
|
||||
// Fast path: there is no need creating indexes for metricNameRaw yet.
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
// Slow path: create indexes for (tsid, metricNameRaw) at db.
|
||||
mn := GetMetricName()
|
||||
|
|
|
@ -1983,7 +1983,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrevDate := hmPrev.hour / 24
|
||||
nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v
|
||||
todayShare16bit := uint64((float64(fasttime.UnixTimestamp()%(3600*24)) / (3600 * 24)) * (1 << 16))
|
||||
ts := fasttime.UnixTimestamp()
|
||||
// Start pre-populating the next per-day inverted index during the last hour of the current day.
|
||||
// pMin linearly increases from 0 to 1 during the last hour of the day.
|
||||
pMin := (float64(ts%(3600*24)) / 3600) - 23
|
||||
type pendingDateMetricID struct {
|
||||
date uint64
|
||||
metricID uint64
|
||||
|
@ -2012,18 +2015,20 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
// Fast path: the metricID is in the current hour cache.
|
||||
// This means the metricID has been already added to per-day inverted index.
|
||||
|
||||
// Gradually pre-populate per-day inverted index for the next day
|
||||
// during the current day.
|
||||
// Gradually pre-populate per-day inverted index for the next day during the last hour of the current day.
|
||||
// This should reduce CPU usage spike and slowdown at the beginning of the next day
|
||||
// when entries for all the active time series must be added to the index.
|
||||
// This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 .
|
||||
if todayShare16bit > (metricID&(1<<16-1)) && !nextDayMetricIDs.Has(metricID) {
|
||||
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
|
||||
date: date + 1,
|
||||
metricID: metricID,
|
||||
mr: mrs[i],
|
||||
})
|
||||
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
|
||||
if pMin > 0 {
|
||||
p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
|
||||
if p < pMin && !nextDayMetricIDs.Has(metricID) {
|
||||
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
|
||||
date: date + 1,
|
||||
metricID: metricID,
|
||||
mr: mrs[i],
|
||||
})
|
||||
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -2117,6 +2122,13 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
return firstError
|
||||
}
|
||||
|
||||
func fastHashUint64(x uint64) uint64 {
|
||||
x ^= x >> 12 // a
|
||||
x ^= x << 25 // b
|
||||
x ^= x >> 27 // c
|
||||
return x * 2685821657736338717
|
||||
}
|
||||
|
||||
// dateMetricIDCache is fast cache for holding (date, metricID) entries.
|
||||
//
|
||||
// It should be faster than map[date]*uint64set.Set on multicore systems.
|
||||
|
|
Loading…
Reference in a new issue