diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 33487cc1c..eb2b43d7c 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2496,7 +2496,7 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) { if e.date == date { pendingMetricIDs.Union(&e.v) } else { - // Do not add pendingMetricIDs from the previous day to the cyrrent day, + // Do not add pendingMetricIDs from the previous day to the current day, // since this may result in missing registration of the metricIDs in the per-day inverted index. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 pendingMetricIDs = &uint64set.Set{} @@ -2535,9 +2535,8 @@ func (s *Storage) updateCurrHourMetricIDs(hour uint64) { byTenant = make(map[accountProjectKey]*uint64set.Set) isFull = true } - - if hour%24 != 0 { - // Do not add pending metricIDs from the previous hour to the current hour on the next day, + if hm.hour == hour || hour%24 != 0 { + // Do not add pending metricIDs from the previous hour on the previous day to the current hour, // since this may result in missing registration of the metricIDs in the per-day inverted index. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 for _, x := range newEntries { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 9572fed46..f8e5306a8 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -156,9 +156,12 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() hour := fasttime.UnixHour() + if hour%24 == 0 { + hour++ + } hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - hour: 123, + hour: hour - 1, } hmOrig.m.Add(12) hmOrig.m.Add(34) @@ -251,9 +254,12 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { } hour := fasttime.UnixHour() + if hour%24 == 0 { + hour++ + } hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - hour: 123, + hour: hour - 1, } hmOrig.m.Add(12) hmOrig.m.Add(34) @@ -355,6 +361,74 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", len(s.pendingHourEntries), 0) } }) + t.Run("nonempty_pending_metric_ids_valid_curr_hour_start_of_day", func(t *testing.T) { + s := newStorage() + s.pendingHourEntries = []pendingHourMetricIDEntry{ + {AccountID: 123, ProjectID: 431, MetricID: 343}, + {AccountID: 123, ProjectID: 431, MetricID: 32424}, + {AccountID: 1, ProjectID: 2, MetricID: 8293432}, + } + mExpected := &uint64set.Set{} + for _, e := range s.pendingHourEntries { + mExpected.Add(e.MetricID) + } + byTenantExpected := make(map[accountProjectKey]*uint64set.Set) + for _, e := range s.pendingHourEntries { + k := accountProjectKey{ + AccountID: e.AccountID, + ProjectID: e.ProjectID, + } + x := byTenantExpected[k] + if x == nil { + x = &uint64set.Set{} + byTenantExpected[k] = x + } + x.Add(e.MetricID) + } + + hour := fasttime.UnixHour() + hour -= hour % 24 + hmOrig := &hourMetricIDs{ + m: &uint64set.Set{}, + hour: hour, + } + hmOrig.m.Add(12) + hmOrig.m.Add(34) + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs(hour) + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + // It is possible new hour occurred. Update the hour and verify it again. + hour = uint64(timestampFromTime(time.Now())) / msecPerHour + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + // Do not run other checks, since they may fail. + return + } + m := mExpected.Clone() + hmOrig.m.ForEach(func(part []uint64) bool { + for _, metricID := range part { + m.Add(metricID) + } + return true + }) + if !hmCurr.m.Equal(m) { + t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) + } + if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) { + t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) + } + + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmEmpty := &hourMetricIDs{} + if !reflect.DeepEqual(hmPrev, hmEmpty) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) + } + if len(s.pendingHourEntries) != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", len(s.pendingHourEntries), 0) + } + }) t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) { s := newStorage()