diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1d6371ee8..3aef080f2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320). + ## [v1.83.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.83.0) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index f05907169..ccb154e75 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -715,10 +715,12 @@ func (s *Storage) currHourMetricIDsUpdater() { for { select { case <-s.stop: - s.updateCurrHourMetricIDs() + hour := fasttime.UnixHour() + s.updateCurrHourMetricIDs(hour) return case <-ticker.C: - s.updateCurrHourMetricIDs() + hour := fasttime.UnixHour() + s.updateCurrHourMetricIDs(hour) } } } @@ -731,10 +733,12 @@ func (s *Storage) nextDayMetricIDsUpdater() { for { select { case <-s.stop: - s.updateNextDayMetricIDs() + date := fasttime.UnixDate() + s.updateNextDayMetricIDs(date) return case <-ticker.C: - s.updateNextDayMetricIDs() + date := fasttime.UnixDate() + s.updateNextDayMetricIDs(date) } } } @@ -2411,8 +2415,7 @@ type byDateMetricIDEntry struct { v uint64set.Set } -func (s *Storage) updateNextDayMetricIDs() { - date := fasttime.UnixDate() +func (s *Storage) updateNextDayMetricIDs(date uint64) { e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) s.pendingNextDayMetricIDsLock.Lock() pendingMetricIDs := s.pendingNextDayMetricIDs @@ -2426,6 +2429,11 @@ func (s *Storage) updateNextDayMetricIDs() { // Slow path: union pendingMetricIDs with e.v if e.date == date { pendingMetricIDs.Union(&e.v) + } else { + // Do not add pendingMetricIDs from the previous day to the cyrrent day, + // since this may result in missing registration of the metricIDs in the per-day inverted index. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 + pendingMetricIDs = &uint64set.Set{} } eNew := &byDateMetricIDEntry{ date: date, @@ -2434,7 +2442,7 @@ func (s *Storage) updateNextDayMetricIDs() { s.nextDayMetricIDs.Store(eNew) } -func (s *Storage) updateCurrHourMetricIDs() { +func (s *Storage) updateCurrHourMetricIDs(hour uint64) { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) var newEntries []pendingHourMetricIDEntry s.pendingHourEntriesLock.Lock() @@ -2453,7 +2461,6 @@ func (s *Storage) updateCurrHourMetricIDs() { } s.pendingHourEntriesLock.Unlock() - hour := fasttime.UnixHour() if len(newEntries) == 0 && hm.hour == hour { // Fast path: nothing to update. return @@ -2473,18 +2480,23 @@ func (s *Storage) updateCurrHourMetricIDs() { byTenant = make(map[accountProjectKey]*uint64set.Set) } - for _, x := range newEntries { - m.Add(x.MetricID) - k := accountProjectKey{ - AccountID: x.AccountID, - ProjectID: x.ProjectID, + if hour%24 != 0 { + // Do not add pending metricIDs from the previous hour to the current hour on the next day, + // since this may result in missing registration of the metricIDs in the per-day inverted index. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 + for _, x := range newEntries { + m.Add(x.MetricID) + k := accountProjectKey{ + AccountID: x.AccountID, + ProjectID: x.ProjectID, + } + e := byTenant[k] + if e == nil { + e = &uint64set.Set{} + byTenant[k] = e + } + e.Add(x.MetricID) } - e := byTenant[k] - if e == nil { - e = &uint64set.Set{} - byTenant[k] = e - } - e.Add(x.MetricID) } hmNew := &hourMetricIDs{ diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 18fd93188..521cce058 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -11,6 +11,7 @@ import ( "testing/quick" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -154,7 +155,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { } t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: 123, @@ -162,7 +163,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. @@ -186,7 +187,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { }) t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, @@ -194,7 +195,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. @@ -243,7 +244,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { x.Add(e.MetricID) } - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: 123, @@ -251,7 +252,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. @@ -300,7 +301,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { x.Add(e.MetricID) } - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, @@ -308,7 +309,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. @@ -342,6 +343,43 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", len(s.pendingHourEntries), 0) } }) + t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) { + s := newStorage() + + hour := fasttime.UnixHour() + hour -= hour % 24 + + s.pendingHourEntries = []pendingHourMetricIDEntry{ + {AccountID: 123, ProjectID: 431, MetricID: 343}, + {AccountID: 123, ProjectID: 431, MetricID: 32424}, + {AccountID: 1, ProjectID: 2, MetricID: 8293432}, + } + + hmOrig := &hourMetricIDs{ + m: &uint64set.Set{}, + hour: hour - 1, + } + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs(hour) + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + if hmCurr.m.Len() != 0 { + t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil)) + } + byTenantExpected := make(map[accountProjectKey]*uint64set.Set) + if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) { + t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected) + } + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + if !reflect.DeepEqual(hmPrev, hmOrig) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) + } + if len(s.pendingHourEntries) != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", len(s.pendingHourEntries), 0) + } + }) } func TestMetricRowMarshalUnmarshal(t *testing.T) {