diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 72b291481..4951df9ac 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -372,6 +372,13 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(idbm().DateRangeSearchHits) }) + metrics.NewGauge(`vm_date_metric_id_cache_syncs_total`, func() float64 { + return float64(m().DateMetricIDCacheSyncsCount) + }) + metrics.NewGauge(`vm_date_metric_id_cache_resets_total`, func() float64 { + return float64(m().DateMetricIDCacheResetsCount) + }) + metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheSize) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 72e709f21..9b116536d 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -934,7 +934,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { MaxTimestamp: (1 << 63) - 1, } is := db.getIndexSearch() - metricIDs, err := is.searchMetricIDs(tfss, tr, 1e12) + metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9) db.putIndexSearch(is) if err != nil { return 0, err diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 0441ebe40..975598ff3 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -322,7 +322,9 @@ type Metrics struct { MetricNameCacheMisses uint64 MetricNameCacheCollisions uint64 - DateMetricIDCacheSize uint64 + DateMetricIDCacheSize uint64 + DateMetricIDCacheSyncsCount uint64 + DateMetricIDCacheResetsCount uint64 HourMetricIDCacheSize uint64 @@ -375,6 +377,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.MetricNameCacheCollisions += cs.Collisions m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) + m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) + m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) @@ -1014,6 +1018,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { // // It should be faster than map[date]*uint64set.Set on multicore systems. type dateMetricIDCache struct { + // 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches. + syncsCount uint64 + resetsCount uint64 + // Contains immutable map byDate atomic.Value @@ -1031,9 +1039,13 @@ func newDateMetricIDCache() *dateMetricIDCache { func (dmc *dateMetricIDCache) Reset() { dmc.mu.Lock() + // Do not reset syncsCount and resetsCount dmc.byDate.Store(newByDateMetricIDMap()) dmc.byDateMutable = newByDateMetricIDMap() + dmc.lastSyncTime = time.Now() dmc.mu.Unlock() + + atomic.AddUint64(&dmc.resetsCount, 1) } func (dmc *dateMetricIDCache) EntriesCount() int { @@ -1088,10 +1100,11 @@ func (dmc *dateMetricIDCache) sync() { e.v.Union(v) } dmc.byDate.Store(dmc.byDateMutable) - byDateMutable := newByDateMetricIDMap() - dmc.byDateMutable = byDateMutable + dmc.byDateMutable = newByDateMetricIDMap() dmc.mu.Unlock() + atomic.AddUint64(&dmc.syncsCount, 1) + if dmc.EntriesCount() > memory.Allowed()/128 { dmc.Reset() } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 18cb982b6..e4f4a131c 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -13,6 +13,88 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) +func TestDateMetricIDCacheSerial(t *testing.T) { + c := newDateMetricIDCache() + if err := testDateMetricIDCache(c, false); err != nil { + t.Fatalf("unexpected error: %s", err) + } +} + +func TestDateMetricIDCacheConcurrent(t *testing.T) { + c := newDateMetricIDCache() + ch := make(chan error, 5) + for i := 0; i < 5; i++ { + go func() { + ch <- testDateMetricIDCache(c, true) + }() + } + for i := 0; i < 5; i++ { + select { + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(time.Second * 5): + t.Fatalf("timeout") + } + } +} + +func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { + type dmk struct { + date uint64 + metricID uint64 + } + m := make(map[dmk]bool) + for i := 0; i < 1e5; i++ { + date := uint64(i) % 3 + metricID := uint64(i) % 1237 + if !concurrent && c.Has(date, metricID) { + if !m[dmk{date, metricID}] { + return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID) + } + continue + } + c.Set(date, metricID) + m[dmk{date, metricID}] = true + if !concurrent && !c.Has(date, metricID) { + return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID) + } + if i%11234 == 0 { + c.sync() + } + if i%34323 == 0 { + c.Reset() + m = make(map[dmk]bool) + } + } + + // Verify fast path after sync. + for i := 0; i < 1e5; i++ { + date := uint64(i) % 3 + metricID := uint64(i) % 123 + c.Set(date, metricID) + } + c.sync() + for i := 0; i < 1e5; i++ { + date := uint64(i) % 3 + metricID := uint64(i) % 123 + if !concurrent && !c.Has(date, metricID) { + return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID) + } + } + + // Verify c.Reset + if n := c.EntriesCount(); !concurrent && n < 123 { + return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n) + } + c.Reset() + if n := c.EntriesCount(); !concurrent && n > 0 { + return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n) + } + return nil +} + func TestUpdateCurrHourMetricIDs(t *testing.T) { newStorage := func() *Storage { var s Storage