lib/storage: follow-up for 790768f20b

- Document the bugfix at docs/CHANGELOG.md
- Simplify the bugfix a bit
This commit is contained in:
Aliaksandr Valialkin 2022-11-07 14:04:06 +02:00
parent f9dc3da9e2
commit daa70e6560
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 46 additions and 62 deletions

View file

@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320).
## [v1.83.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.83.0)

View file

@ -98,7 +98,7 @@ type Storage struct {
// Pending MetricID values to be added to currHourMetricIDs.
pendingHourEntriesLock sync.Mutex
pendingHourEntries pendingHourEntries
pendingHourEntries *uint64set.Set
// Pending MetricIDs to be added to nextDayMetricIDs.
pendingNextDayMetricIDsLock sync.Mutex
@ -213,7 +213,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids")
s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev)
s.pendingHourEntries = pendingHourEntries{{}, {}}
s.pendingHourEntries = &uint64set.Set{}
date := fasttime.UnixDate()
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date)
@ -718,10 +718,12 @@ func (s *Storage) nextDayMetricIDsUpdater() {
for {
select {
case <-s.stop:
s.updateNextDayMetricIDs()
date := fasttime.UnixDate()
s.updateNextDayMetricIDs(date)
return
case <-ticker.C:
s.updateNextDayMetricIDs()
date := fasttime.UnixDate()
s.updateNextDayMetricIDs(date)
}
}
}
@ -770,7 +772,7 @@ func (s *Storage) mustRotateIndexDB() {
// So queries for the last 24 hours stop returning samples added at step 3.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries = pendingHourEntries{{}, {}}
s.pendingHourEntries = &uint64set.Set{}
s.pendingHourEntriesLock.Unlock()
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{})
@ -2021,7 +2023,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
if len(pendingHourEntries) > 0 {
s.pendingHourEntriesLock.Lock()
s.pendingHourEntries.At(hm.hour).AddMulti(pendingHourEntries)
s.pendingHourEntries.AddMulti(pendingHourEntries)
s.pendingHourEntriesLock.Unlock()
}
if len(pendingDateMetricIDs) == 0 {
@ -2292,8 +2294,7 @@ type byDateMetricIDEntry struct {
v uint64set.Set
}
func (s *Storage) updateNextDayMetricIDs() {
date := fasttime.UnixDate()
func (s *Storage) updateNextDayMetricIDs(date uint64) {
e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry)
s.pendingNextDayMetricIDsLock.Lock()
pendingMetricIDs := s.pendingNextDayMetricIDs
@ -2307,6 +2308,11 @@ func (s *Storage) updateNextDayMetricIDs() {
// Slow path: union pendingMetricIDs with e.v
if e.date == date {
pendingMetricIDs.Union(&e.v)
} else {
// Do not add pendingMetricIDs from the previous day to the cyrrent day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
pendingMetricIDs = &uint64set.Set{}
}
eNew := &byDateMetricIDEntry{
date: date,
@ -2315,24 +2321,14 @@ func (s *Storage) updateNextDayMetricIDs() {
s.nextDayMetricIDs.Store(eNew)
}
type pendingHourEntries [2]*uint64set.Set
func (p pendingHourEntries) At(hour uint64) *uint64set.Set {
return p[hour%2]
}
func (p pendingHourEntries) Len() int {
return p[0].Len() + p[1].Len()
}
func (s *Storage) updateCurrHourMetricIDs(hour uint64) {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourEntriesLock.Lock()
newMetricIDs := s.pendingHourEntries
s.pendingHourEntries = pendingHourEntries{{}, {}}
s.pendingHourEntries = &uint64set.Set{}
s.pendingHourEntriesLock.Unlock()
if newMetricIDs.At(hour).Len() == 0 && hm.hour == hour {
if newMetricIDs.Len() == 0 && hm.hour == hour {
// Fast path: nothing to update.
return
}
@ -2344,18 +2340,18 @@ func (s *Storage) updateCurrHourMetricIDs(hour uint64) {
} else {
m = &uint64set.Set{}
}
m.Union(newMetricIDs.At(hour))
if hour%24 != 0 {
// Do not add pending metricIDs from the previous hour to the current hour on the next day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
m.Union(newMetricIDs)
}
hmNew := &hourMetricIDs{
m: m,
hour: hour,
}
s.currHourMetricIDs.Store(hmNew)
if hm.hour != hour {
if hm.m != nil {
hm.m.Union(newMetricIDs.At(hm.hour))
} else {
hm.m = newMetricIDs.At(hm.hour)
}
s.prevHourMetricIDs.Store(hm)
}
}

View file

@ -11,6 +11,7 @@ import (
"testing/quick"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
@ -150,15 +151,15 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
var s Storage
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{})
s.pendingHourEntries = pendingHourEntries{{}, {}}
s.pendingHourEntries = &uint64set.Set{}
return &s
}
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour - 1,
hour: 123,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
@ -183,7 +184,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
})
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
@ -212,17 +213,16 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
})
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries.At(hour).Union(pendingHourEntries)
s.pendingHourEntries = pendingHourEntries
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour - 1,
hour: 123,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
@ -247,15 +247,13 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries.At(hour).Union(pendingHourEntries)
s.pendingHourEntries = pendingHourEntries
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
@ -289,21 +287,17 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_from_previous_hour_stale_hour", func(t *testing.T) {
t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hour := fasttime.UnixHour()
hour -= hour % 24
pendingPreviousHourEntries := &uint64set.Set{}
pendingPreviousHourEntries.Add(343)
pendingPreviousHourEntries.Add(32424)
pendingPreviousHourEntries.Add(8293432)
s.pendingHourEntries.At(hour - 1).Union(pendingPreviousHourEntries)
pendingCurrentHourEntries := &uint64set.Set{}
pendingCurrentHourEntries.Add(12)
pendingCurrentHourEntries.Add(34)
s.pendingHourEntries.At(hour).Union(pendingCurrentHourEntries)
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
@ -315,21 +309,13 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
if !hmCurr.m.Equal(pendingCurrentHourEntries) {
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingCurrentHourEntries)
if hmCurr.m.Len() != 0 {
t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil))
}
if !hmCurr.isFull {
t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true)
}
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
if !hmPrev.m.Equal(pendingPreviousHourEntries) {
t.Fatalf("unexpected hmPrev.m; got %v; want %v", hmPrev.m, pendingPreviousHourEntries)
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if hmPrev.isFull {
t.Fatalf("unexpected hmPrev.isFull; got %v; want %v", hmPrev.isFull, false)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}