diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 5ba759a45..24874ad70 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -430,6 +430,13 @@ func registerStorageMetrics() { return float64(idbm().DateRangeSearchHits) }) + metrics.NewGauge(`vm_date_metric_id_cache_syncs_total`, func() float64 { + return float64(m().DateMetricIDCacheSyncsCount) + }) + metrics.NewGauge(`vm_date_metric_id_cache_resets_total`, func() float64 { + return float64(m().DateMetricIDCacheResetsCount) + }) + metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheSize) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 87270fffc..5a9dd3068 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -919,7 +919,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch() - metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12) + metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9) db.putIndexSearch(is) if err != nil { return 0, err diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9a9f1bca7..6b211c7ce 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -312,7 +312,9 @@ type Metrics struct { MetricNameCacheMisses uint64 MetricNameCacheCollisions uint64 - DateMetricIDCacheSize uint64 + DateMetricIDCacheSize uint64 + DateMetricIDCacheSyncsCount uint64 + DateMetricIDCacheResetsCount uint64 HourMetricIDCacheSize uint64 @@ -365,6 +367,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.MetricNameCacheCollisions += cs.Collisions m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) + m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) + m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) @@ -943,6 +947,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { // // It should be faster than map[date]*uint64set.Set on multicore systems. type dateMetricIDCache struct { + // 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches. + syncsCount uint64 + resetsCount uint64 + // Contains immutable map byDate atomic.Value @@ -960,9 +968,13 @@ func newDateMetricIDCache() *dateMetricIDCache { func (dmc *dateMetricIDCache) Reset() { dmc.mu.Lock() + // Do not reset syncsCount and resetsCount dmc.byDate.Store(newByDateMetricIDMap()) dmc.byDateMutable = newByDateMetricIDMap() + dmc.lastSyncTime = time.Now() dmc.mu.Unlock() + + atomic.AddUint64(&dmc.resetsCount, 1) } func (dmc *dateMetricIDCache) EntriesCount() int { @@ -1017,10 +1029,11 @@ func (dmc *dateMetricIDCache) sync() { e.v.Union(v) } dmc.byDate.Store(dmc.byDateMutable) - byDateMutable := newByDateMetricIDMap() - dmc.byDateMutable = byDateMutable + dmc.byDateMutable = newByDateMetricIDMap() dmc.mu.Unlock() + atomic.AddUint64(&dmc.syncsCount, 1) + if dmc.EntriesCount() > memory.Allowed()/128 { dmc.Reset() } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 3c130d21c..5e97bcd82 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -13,6 +13,88 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) +func TestDateMetricIDCacheSerial(t *testing.T) { + c := newDateMetricIDCache() + if err := testDateMetricIDCache(c, false); err != nil { + t.Fatalf("unexpected error: %s", err) + } +} + +func TestDateMetricIDCacheConcurrent(t *testing.T) { + c := newDateMetricIDCache() + ch := make(chan error, 5) + for i := 0; i < 5; i++ { + go func() { + ch <- testDateMetricIDCache(c, true) + }() + } + for i := 0; i < 5; i++ { + select { + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(time.Second * 5): + t.Fatalf("timeout") + } + } +} + +func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { + type dmk struct { + date uint64 + metricID uint64 + } + m := make(map[dmk]bool) + for i := 0; i < 1e5; i++ { + date := uint64(i) % 3 + metricID := uint64(i) % 1237 + if !concurrent && c.Has(date, metricID) { + if !m[dmk{date, metricID}] { + return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID) + } + continue + } + c.Set(date, metricID) + m[dmk{date, metricID}] = true + if !concurrent && !c.Has(date, metricID) { + return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID) + } + if i%11234 == 0 { + c.sync() + } + if i%34323 == 0 { + c.Reset() + m = make(map[dmk]bool) + } + } + + // Verify fast path after sync. + for i := 0; i < 1e5; i++ { + date := uint64(i) % 3 + metricID := uint64(i) % 123 + c.Set(date, metricID) + } + c.sync() + for i := 0; i < 1e5; i++ { + date := uint64(i) % 3 + metricID := uint64(i) % 123 + if !concurrent && !c.Has(date, metricID) { + return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID) + } + } + + // Verify c.Reset + if n := c.EntriesCount(); !concurrent && n < 123 { + return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n) + } + c.Reset() + if n := c.EntriesCount(); !concurrent && n > 0 { + return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n) + } + return nil +} + func TestUpdateCurrHourMetricIDs(t *testing.T) { newStorage := func() *Storage { var s Storage