mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: add tests for dateMetricIDCache
This commit is contained in:
parent
c342f5e37e
commit
8e8f98f712
4 changed files with 106 additions and 4 deletions
|
@ -430,6 +430,13 @@ func registerStorageMetrics() {
|
|||
return float64(idbm().DateRangeSearchHits)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_date_metric_id_cache_syncs_total`, func() float64 {
|
||||
return float64(m().DateMetricIDCacheSyncsCount)
|
||||
})
|
||||
metrics.NewGauge(`vm_date_metric_id_cache_resets_total`, func() float64 {
|
||||
return float64(m().DateMetricIDCacheResetsCount)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 {
|
||||
return float64(m().TSIDCacheSize)
|
||||
})
|
||||
|
|
|
@ -919,7 +919,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
|||
MaxTimestamp: (1 << 63) - 1,
|
||||
}
|
||||
is := db.getIndexSearch()
|
||||
metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12)
|
||||
metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9)
|
||||
db.putIndexSearch(is)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
|
|
@ -312,7 +312,9 @@ type Metrics struct {
|
|||
MetricNameCacheMisses uint64
|
||||
MetricNameCacheCollisions uint64
|
||||
|
||||
DateMetricIDCacheSize uint64
|
||||
DateMetricIDCacheSize uint64
|
||||
DateMetricIDCacheSyncsCount uint64
|
||||
DateMetricIDCacheResetsCount uint64
|
||||
|
||||
HourMetricIDCacheSize uint64
|
||||
|
||||
|
@ -365,6 +367,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.MetricNameCacheCollisions += cs.Collisions
|
||||
|
||||
m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount())
|
||||
m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount)
|
||||
m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount)
|
||||
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
|
@ -943,6 +947,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error {
|
|||
//
|
||||
// It should be faster than map[date]*uint64set.Set on multicore systems.
|
||||
type dateMetricIDCache struct {
|
||||
// 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches.
|
||||
syncsCount uint64
|
||||
resetsCount uint64
|
||||
|
||||
// Contains immutable map
|
||||
byDate atomic.Value
|
||||
|
||||
|
@ -960,9 +968,13 @@ func newDateMetricIDCache() *dateMetricIDCache {
|
|||
|
||||
func (dmc *dateMetricIDCache) Reset() {
|
||||
dmc.mu.Lock()
|
||||
// Do not reset syncsCount and resetsCount
|
||||
dmc.byDate.Store(newByDateMetricIDMap())
|
||||
dmc.byDateMutable = newByDateMetricIDMap()
|
||||
dmc.lastSyncTime = time.Now()
|
||||
dmc.mu.Unlock()
|
||||
|
||||
atomic.AddUint64(&dmc.resetsCount, 1)
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) EntriesCount() int {
|
||||
|
@ -1017,10 +1029,11 @@ func (dmc *dateMetricIDCache) sync() {
|
|||
e.v.Union(v)
|
||||
}
|
||||
dmc.byDate.Store(dmc.byDateMutable)
|
||||
byDateMutable := newByDateMetricIDMap()
|
||||
dmc.byDateMutable = byDateMutable
|
||||
dmc.byDateMutable = newByDateMetricIDMap()
|
||||
dmc.mu.Unlock()
|
||||
|
||||
atomic.AddUint64(&dmc.syncsCount, 1)
|
||||
|
||||
if dmc.EntriesCount() > memory.Allowed()/128 {
|
||||
dmc.Reset()
|
||||
}
|
||||
|
|
|
@ -13,6 +13,88 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
func TestDateMetricIDCacheSerial(t *testing.T) {
|
||||
c := newDateMetricIDCache()
|
||||
if err := testDateMetricIDCache(c, false); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDateMetricIDCacheConcurrent(t *testing.T) {
|
||||
c := newDateMetricIDCache()
|
||||
ch := make(chan error, 5)
|
||||
for i := 0; i < 5; i++ {
|
||||
go func() {
|
||||
ch <- testDateMetricIDCache(c, true)
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 5; i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
||||
type dmk struct {
|
||||
date uint64
|
||||
metricID uint64
|
||||
}
|
||||
m := make(map[dmk]bool)
|
||||
for i := 0; i < 1e5; i++ {
|
||||
date := uint64(i) % 3
|
||||
metricID := uint64(i) % 1237
|
||||
if !concurrent && c.Has(date, metricID) {
|
||||
if !m[dmk{date, metricID}] {
|
||||
return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
c.Set(date, metricID)
|
||||
m[dmk{date, metricID}] = true
|
||||
if !concurrent && !c.Has(date, metricID) {
|
||||
return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID)
|
||||
}
|
||||
if i%11234 == 0 {
|
||||
c.sync()
|
||||
}
|
||||
if i%34323 == 0 {
|
||||
c.Reset()
|
||||
m = make(map[dmk]bool)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify fast path after sync.
|
||||
for i := 0; i < 1e5; i++ {
|
||||
date := uint64(i) % 3
|
||||
metricID := uint64(i) % 123
|
||||
c.Set(date, metricID)
|
||||
}
|
||||
c.sync()
|
||||
for i := 0; i < 1e5; i++ {
|
||||
date := uint64(i) % 3
|
||||
metricID := uint64(i) % 123
|
||||
if !concurrent && !c.Has(date, metricID) {
|
||||
return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify c.Reset
|
||||
if n := c.EntriesCount(); !concurrent && n < 123 {
|
||||
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
|
||||
}
|
||||
c.Reset()
|
||||
if n := c.EntriesCount(); !concurrent && n > 0 {
|
||||
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
||||
newStorage := func() *Storage {
|
||||
var s Storage
|
||||
|
|
Loading…
Reference in a new issue