From 790768f20b7ed83af315fcc37ee15d3fd11b33e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Marsza=C5=82?= Date: Mon, 7 Nov 2022 12:55:37 +0100 Subject: [PATCH 1/6] Fix issue-3309 - currHourMetricIDs shouldn't contain metrics from prev hour (#3320) * fix issue-3309 currHourMetricIDs shouldn't contain metrics from prev hour * Update storage.go --- lib/storage/storage.go | 38 +++++++++----- lib/storage/storage_test.go | 98 ++++++++++++++++++++++++------------- 2 files changed, 90 insertions(+), 46 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index dc732cf9a..21c3a3528 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -98,7 +98,7 @@ type Storage struct { // Pending MetricID values to be added to currHourMetricIDs. pendingHourEntriesLock sync.Mutex - pendingHourEntries *uint64set.Set + pendingHourEntries pendingHourEntries // Pending MetricIDs to be added to nextDayMetricIDs. pendingNextDayMetricIDsLock sync.Mutex @@ -213,7 +213,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) - s.pendingHourEntries = &uint64set.Set{} + s.pendingHourEntries = pendingHourEntries{{}, {}} date := fasttime.UnixDate() nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date) @@ -700,10 +700,12 @@ func (s *Storage) currHourMetricIDsUpdater() { for { select { case <-s.stop: - s.updateCurrHourMetricIDs() + hour := fasttime.UnixHour() + s.updateCurrHourMetricIDs(hour) return case <-ticker.C: - s.updateCurrHourMetricIDs() + hour := fasttime.UnixHour() + s.updateCurrHourMetricIDs(hour) } } } @@ -768,7 +770,7 @@ func (s *Storage) mustRotateIndexDB() { // So queries for the last 24 hours stop returning samples added at step 3. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698 s.pendingHourEntriesLock.Lock() - s.pendingHourEntries = &uint64set.Set{} + s.pendingHourEntries = pendingHourEntries{{}, {}} s.pendingHourEntriesLock.Unlock() s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) @@ -2027,7 +2029,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { } if len(pendingHourEntries) > 0 { s.pendingHourEntriesLock.Lock() - s.pendingHourEntries.AddMulti(pendingHourEntries) + s.pendingHourEntries.At(hm.hour).AddMulti(pendingHourEntries) s.pendingHourEntriesLock.Unlock() } if len(pendingDateMetricIDs) == 0 { @@ -2321,15 +2323,24 @@ func (s *Storage) updateNextDayMetricIDs() { s.nextDayMetricIDs.Store(eNew) } -func (s *Storage) updateCurrHourMetricIDs() { +type pendingHourEntries [2]*uint64set.Set + +func (p pendingHourEntries) At(hour uint64) *uint64set.Set { + return p[hour%2] +} + +func (p pendingHourEntries) Len() int { + return p[0].Len() + p[1].Len() +} + +func (s *Storage) updateCurrHourMetricIDs(hour uint64) { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourEntriesLock.Lock() newMetricIDs := s.pendingHourEntries - s.pendingHourEntries = &uint64set.Set{} + s.pendingHourEntries = pendingHourEntries{{}, {}} s.pendingHourEntriesLock.Unlock() - hour := fasttime.UnixHour() - if newMetricIDs.Len() == 0 && hm.hour == hour { + if newMetricIDs.At(hour).Len() == 0 && hm.hour == hour { // Fast path: nothing to update. return } @@ -2343,7 +2354,7 @@ func (s *Storage) updateCurrHourMetricIDs() { m = &uint64set.Set{} isFull = true } - m.Union(newMetricIDs) + m.Union(newMetricIDs.At(hour)) hmNew := &hourMetricIDs{ m: m, hour: hour, @@ -2351,6 +2362,11 @@ func (s *Storage) updateCurrHourMetricIDs() { } s.currHourMetricIDs.Store(hmNew) if hm.hour != hour { + if hm.m != nil { + hm.m.Union(newMetricIDs.At(hm.hour)) + } else { + hm.m = newMetricIDs.At(hm.hour) + } s.prevHourMetricIDs.Store(hm) } } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index d9cae957a..03d48a362 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -150,7 +150,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { var s Storage s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) - s.pendingHourEntries = &uint64set.Set{} + s.pendingHourEntries = pendingHourEntries{{}, {}} return &s } t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { @@ -158,19 +158,15 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - hour: 123, + hour: hour - 1, } hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } if hmCurr.m.Len() != 0 { t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0) @@ -198,16 +194,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } - // Do not run other checks, since they may fail. - return + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } if !reflect.DeepEqual(hmCurr, hmOrig) { t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig) @@ -228,28 +218,25 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { }) t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + pendingHourEntries := &uint64set.Set{} pendingHourEntries.Add(343) pendingHourEntries.Add(32424) pendingHourEntries.Add(8293432) - s.pendingHourEntries = pendingHourEntries + s.pendingHourEntries.At(hour).Union(pendingHourEntries) - hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - hour: 123, + hour: hour - 1, } hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } if !hmCurr.m.Equal(pendingHourEntries) { t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries) @@ -269,13 +256,15 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { }) t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() + + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + pendingHourEntries := &uint64set.Set{} pendingHourEntries.Add(343) pendingHourEntries.Add(32424) pendingHourEntries.Add(8293432) - s.pendingHourEntries = pendingHourEntries + s.pendingHourEntries.At(hour).Union(pendingHourEntries) - hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, @@ -283,16 +272,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } - // Do not run other checks, since they may fail. - return + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } m := pendingHourEntries.Clone() hmOrig.m.ForEach(func(part []uint64) bool { @@ -314,6 +297,51 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } + if s.pendingHourEntries.Len() != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) + } + }) + t.Run("nonempty_pending_metric_ids_from_previous_hour_stale_hour", func(t *testing.T) { + s := newStorage() + + hour := uint64(timestampFromTime(time.Now())) / msecPerHour + + pendingPreviousHourEntries := &uint64set.Set{} + pendingPreviousHourEntries.Add(343) + pendingPreviousHourEntries.Add(32424) + pendingPreviousHourEntries.Add(8293432) + s.pendingHourEntries.At(hour - 1).Union(pendingPreviousHourEntries) + + pendingCurrentHourEntries := &uint64set.Set{} + pendingCurrentHourEntries.Add(12) + pendingCurrentHourEntries.Add(34) + s.pendingHourEntries.At(hour).Union(pendingCurrentHourEntries) + + hmOrig := &hourMetricIDs{ + m: &uint64set.Set{}, + hour: hour - 1, + } + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs(hour) + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + if !hmCurr.m.Equal(pendingCurrentHourEntries) { + t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingCurrentHourEntries) + } + if !hmCurr.isFull { + t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) + } + + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + if !hmPrev.m.Equal(pendingPreviousHourEntries) { + t.Fatalf("unexpected hmPrev.m; got %v; want %v", hmPrev.m, pendingPreviousHourEntries) + } + if hmPrev.isFull { + t.Fatalf("unexpected hmPrev.isFull; got %v; want %v", hmPrev.isFull, false) + } + if s.pendingHourEntries.Len() != 0 { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } From dd88c628aac28159ee64dd83eaef77dc59fcd7fc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 7 Nov 2022 13:06:50 +0200 Subject: [PATCH 2/6] lib/storage: remove unused isFull field from hourMetricIDs struct --- lib/storage/storage.go | 20 ++++---------------- lib/storage/storage_test.go | 12 ------------ 2 files changed, 4 insertions(+), 28 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 21c3a3528..dc2dc5b53 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -903,8 +903,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs } // Unmarshal header - isFull := encoding.UnmarshalUint64(src) - src = src[8:] hourLoaded := encoding.UnmarshalUint64(src) src = src[8:] if hourLoaded != hour { @@ -923,7 +921,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs return hm } hm.m = m - hm.isFull = isFull != 0 logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), m.Len(), srcOrigLen) return hm } @@ -952,13 +949,8 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { logger.Infof("saving %s to %q...", name, path) startTime := time.Now() dst := make([]byte, 0, hm.m.Len()*8+24) - isFull := uint64(0) - if hm.isFull { - isFull = 1 - } // Marshal header - dst = encoding.MarshalUint64(dst, isFull) dst = encoding.MarshalUint64(dst, hm.hour) // Marshal hm.m @@ -2347,18 +2339,15 @@ func (s *Storage) updateCurrHourMetricIDs(hour uint64) { // Slow path: hm.m must be updated with non-empty s.pendingHourEntries. var m *uint64set.Set - isFull := hm.isFull if hm.hour == hour { m = hm.m.Clone() } else { m = &uint64set.Set{} - isFull = true } m.Union(newMetricIDs.At(hour)) hmNew := &hourMetricIDs{ - m: m, - hour: hour, - isFull: isFull, + m: m, + hour: hour, } s.currHourMetricIDs.Store(hmNew) if hm.hour != hour { @@ -2372,9 +2361,8 @@ func (s *Storage) updateCurrHourMetricIDs(hour uint64) { } type hourMetricIDs struct { - m *uint64set.Set - hour uint64 - isFull bool + m *uint64set.Set + hour uint64 } type generationTSID struct { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 03d48a362..6dc94a4a0 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -171,9 +171,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if hmCurr.m.Len() != 0 { t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0) } - if !hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) if !reflect.DeepEqual(hmPrev, hmOrig) { @@ -202,9 +199,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !reflect.DeepEqual(hmCurr, hmOrig) { t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig) } - if hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmEmpty := &hourMetricIDs{} @@ -241,9 +235,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !hmCurr.m.Equal(pendingHourEntries) { t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries) } - if !hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) if !reflect.DeepEqual(hmPrev, hmOrig) { @@ -287,9 +278,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !hmCurr.m.Equal(m) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) } - if hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmEmpty := &hourMetricIDs{} From 116811d76132681d75ee37d85d7b1423e5a826ac Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 7 Nov 2022 13:15:51 +0200 Subject: [PATCH 3/6] lib/envtemplate: allow non-env var names inside "%{ ... }" --- lib/envtemplate/envtemplate.go | 12 +++++++++++- lib/envtemplate/envtemplate_test.go | 2 ++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/envtemplate/envtemplate.go b/lib/envtemplate/envtemplate.go index 9d06435bf..6ad779d63 100644 --- a/lib/envtemplate/envtemplate.go +++ b/lib/envtemplate/envtemplate.go @@ -5,6 +5,7 @@ import ( "io" "log" "os" + "regexp" "strings" "github.com/valyala/fasttemplate" @@ -93,14 +94,23 @@ func expand(m map[string]string, s string) (string, error) { return s, nil } result, err := fasttemplate.ExecuteFuncStringWithErr(s, "%{", "}", func(w io.Writer, tag string) (int, error) { + if !isValidEnvVarName(tag) { + return fmt.Fprintf(w, "%%{%s}", tag) + } v, ok := m[tag] if !ok { return 0, fmt.Errorf("missing %q env var", tag) } - return w.Write([]byte(v)) + return fmt.Fprintf(w, "%s", v) }) if err != nil { return "", err } return result, nil } + +func isValidEnvVarName(s string) bool { + return envVarNameRegex.MatchString(s) +} + +var envVarNameRegex = regexp.MustCompile("^[a-zA-Z0-9_]+$") diff --git a/lib/envtemplate/envtemplate_test.go b/lib/envtemplate/envtemplate_test.go index 375ece944..cdd7b9ab3 100644 --- a/lib/envtemplate/envtemplate_test.go +++ b/lib/envtemplate/envtemplate_test.go @@ -24,6 +24,7 @@ func TestExpandTemplates(t *testing.T) { f([]string{"foo=%{bar}", "bar=x"}, []string{"bar=x", "foo=x"}) f([]string{"a=x%{b}", "b=y%{c}z%{d}", "c=123", "d=qwe"}, []string{"a=xy123zqwe", "b=y123zqwe", "c=123", "d=qwe"}) f([]string{"a=x%{b}y", "b=z%{a}q", "c"}, []string{"a=xzxzxzxz%{a}qyqyqyqy", "b=zxzxzxzx%{b}yqyqyqyq", "c="}) + f([]string{"a=%{x.y}"}, []string{"a=%{x.y}"}) } func TestLookupEnv(t *testing.T) { @@ -70,6 +71,7 @@ func TestReplaceSuccess(t *testing.T) { f("", "") f("foo", "foo") f("a %{foo}-x", "a bar-x") + f("%{foo.bar}", "%{foo.bar}") } func TestReplaceFailure(t *testing.T) { From f9dc3da9e2c5a6d465344bc2b70e88cf28e97ae6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 7 Nov 2022 13:57:56 +0200 Subject: [PATCH 4/6] lib/storage: typo fix after 32d48f8dfbb03174858c00bdfe6d9d22431dc8d8 --- lib/storage/storage.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index dc2dc5b53..27ce794b4 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -897,8 +897,8 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs logger.Panicf("FATAL: cannot read %s: %s", path, err) } srcOrigLen := len(src) - if len(src) < 24 { - logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) + if len(src) < 16 { + logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 16) return hm } From daa70e6560b23e931c56fe38614ca745c727f514 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 7 Nov 2022 14:04:06 +0200 Subject: [PATCH 5/6] lib/storage: follow-up for 790768f20b7ed83af315fcc37ee15d3fd11b33e3 - Document the bugfix at docs/CHANGELOG.md - Simplify the bugfix a bit --- docs/CHANGELOG.md | 2 ++ lib/storage/storage.go | 48 ++++++++++++++---------------- lib/storage/storage_test.go | 58 ++++++++++++++----------------------- 3 files changed, 46 insertions(+), 62 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1d6371ee8..3aef080f2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320). + ## [v1.83.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.83.0) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 27ce794b4..016830cb6 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -98,7 +98,7 @@ type Storage struct { // Pending MetricID values to be added to currHourMetricIDs. pendingHourEntriesLock sync.Mutex - pendingHourEntries pendingHourEntries + pendingHourEntries *uint64set.Set // Pending MetricIDs to be added to nextDayMetricIDs. pendingNextDayMetricIDsLock sync.Mutex @@ -213,7 +213,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer hmPrev := s.mustLoadHourMetricIDs(hour-1, "prev_hour_metric_ids") s.currHourMetricIDs.Store(hmCurr) s.prevHourMetricIDs.Store(hmPrev) - s.pendingHourEntries = pendingHourEntries{{}, {}} + s.pendingHourEntries = &uint64set.Set{} date := fasttime.UnixDate() nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date) @@ -718,10 +718,12 @@ func (s *Storage) nextDayMetricIDsUpdater() { for { select { case <-s.stop: - s.updateNextDayMetricIDs() + date := fasttime.UnixDate() + s.updateNextDayMetricIDs(date) return case <-ticker.C: - s.updateNextDayMetricIDs() + date := fasttime.UnixDate() + s.updateNextDayMetricIDs(date) } } } @@ -770,7 +772,7 @@ func (s *Storage) mustRotateIndexDB() { // So queries for the last 24 hours stop returning samples added at step 3. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698 s.pendingHourEntriesLock.Lock() - s.pendingHourEntries = pendingHourEntries{{}, {}} + s.pendingHourEntries = &uint64set.Set{} s.pendingHourEntriesLock.Unlock() s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) @@ -2021,7 +2023,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { } if len(pendingHourEntries) > 0 { s.pendingHourEntriesLock.Lock() - s.pendingHourEntries.At(hm.hour).AddMulti(pendingHourEntries) + s.pendingHourEntries.AddMulti(pendingHourEntries) s.pendingHourEntriesLock.Unlock() } if len(pendingDateMetricIDs) == 0 { @@ -2292,8 +2294,7 @@ type byDateMetricIDEntry struct { v uint64set.Set } -func (s *Storage) updateNextDayMetricIDs() { - date := fasttime.UnixDate() +func (s *Storage) updateNextDayMetricIDs(date uint64) { e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) s.pendingNextDayMetricIDsLock.Lock() pendingMetricIDs := s.pendingNextDayMetricIDs @@ -2307,6 +2308,11 @@ func (s *Storage) updateNextDayMetricIDs() { // Slow path: union pendingMetricIDs with e.v if e.date == date { pendingMetricIDs.Union(&e.v) + } else { + // Do not add pendingMetricIDs from the previous day to the cyrrent day, + // since this may result in missing registration of the metricIDs in the per-day inverted index. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 + pendingMetricIDs = &uint64set.Set{} } eNew := &byDateMetricIDEntry{ date: date, @@ -2315,24 +2321,14 @@ func (s *Storage) updateNextDayMetricIDs() { s.nextDayMetricIDs.Store(eNew) } -type pendingHourEntries [2]*uint64set.Set - -func (p pendingHourEntries) At(hour uint64) *uint64set.Set { - return p[hour%2] -} - -func (p pendingHourEntries) Len() int { - return p[0].Len() + p[1].Len() -} - func (s *Storage) updateCurrHourMetricIDs(hour uint64) { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourEntriesLock.Lock() newMetricIDs := s.pendingHourEntries - s.pendingHourEntries = pendingHourEntries{{}, {}} + s.pendingHourEntries = &uint64set.Set{} s.pendingHourEntriesLock.Unlock() - if newMetricIDs.At(hour).Len() == 0 && hm.hour == hour { + if newMetricIDs.Len() == 0 && hm.hour == hour { // Fast path: nothing to update. return } @@ -2344,18 +2340,18 @@ func (s *Storage) updateCurrHourMetricIDs(hour uint64) { } else { m = &uint64set.Set{} } - m.Union(newMetricIDs.At(hour)) + if hour%24 != 0 { + // Do not add pending metricIDs from the previous hour to the current hour on the next day, + // since this may result in missing registration of the metricIDs in the per-day inverted index. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 + m.Union(newMetricIDs) + } hmNew := &hourMetricIDs{ m: m, hour: hour, } s.currHourMetricIDs.Store(hmNew) if hm.hour != hour { - if hm.m != nil { - hm.m.Union(newMetricIDs.At(hm.hour)) - } else { - hm.m = newMetricIDs.At(hm.hour) - } s.prevHourMetricIDs.Store(hm) } } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 6dc94a4a0..1a01c48ea 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -11,6 +11,7 @@ import ( "testing/quick" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -150,15 +151,15 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { var s Storage s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) - s.pendingHourEntries = pendingHourEntries{{}, {}} + s.pendingHourEntries = &uint64set.Set{} return &s } t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - hour: hour - 1, + hour: 123, } hmOrig.m.Add(12) hmOrig.m.Add(34) @@ -183,7 +184,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { }) t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, @@ -212,17 +213,16 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { }) t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour - pendingHourEntries := &uint64set.Set{} pendingHourEntries.Add(343) pendingHourEntries.Add(32424) pendingHourEntries.Add(8293432) - s.pendingHourEntries.At(hour).Union(pendingHourEntries) + s.pendingHourEntries = pendingHourEntries + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - hour: hour - 1, + hour: 123, } hmOrig.m.Add(12) hmOrig.m.Add(34) @@ -247,15 +247,13 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { }) t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() - - hour := uint64(timestampFromTime(time.Now())) / msecPerHour - pendingHourEntries := &uint64set.Set{} pendingHourEntries.Add(343) pendingHourEntries.Add(32424) pendingHourEntries.Add(8293432) - s.pendingHourEntries.At(hour).Union(pendingHourEntries) + s.pendingHourEntries = pendingHourEntries + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, @@ -289,21 +287,17 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) - t.Run("nonempty_pending_metric_ids_from_previous_hour_stale_hour", func(t *testing.T) { + t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() + hour -= hour % 24 - pendingPreviousHourEntries := &uint64set.Set{} - pendingPreviousHourEntries.Add(343) - pendingPreviousHourEntries.Add(32424) - pendingPreviousHourEntries.Add(8293432) - s.pendingHourEntries.At(hour - 1).Union(pendingPreviousHourEntries) - - pendingCurrentHourEntries := &uint64set.Set{} - pendingCurrentHourEntries.Add(12) - pendingCurrentHourEntries.Add(34) - s.pendingHourEntries.At(hour).Union(pendingCurrentHourEntries) + pendingHourEntries := &uint64set.Set{} + pendingHourEntries.Add(343) + pendingHourEntries.Add(32424) + pendingHourEntries.Add(8293432) + s.pendingHourEntries = pendingHourEntries hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, @@ -315,21 +309,13 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } - if !hmCurr.m.Equal(pendingCurrentHourEntries) { - t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingCurrentHourEntries) + if hmCurr.m.Len() != 0 { + t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil)) } - if !hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) - } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) - if !hmPrev.m.Equal(pendingPreviousHourEntries) { - t.Fatalf("unexpected hmPrev.m; got %v; want %v", hmPrev.m, pendingPreviousHourEntries) + if !reflect.DeepEqual(hmPrev, hmOrig) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } - if hmPrev.isFull { - t.Fatalf("unexpected hmPrev.isFull; got %v; want %v", hmPrev.isFull, false) - } - if s.pendingHourEntries.Len() != 0 { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } From 833262203738278f9cfaa3d14b879f6afbe467fb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 7 Nov 2022 14:58:35 +0200 Subject: [PATCH 6/6] vendor: update github.com/urfave/cli/v2 from v2.23.2 to v2.23.4 --- go.mod | 2 +- go.sum | 4 ++-- vendor/github.com/urfave/cli/v2/godoc-current.txt | 8 ++++++++ vendor/github.com/urfave/cli/v2/help.go | 2 +- vendor/modules.txt | 2 +- 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index a2d7f30da..b38dac93a 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/influxdata/influxdb v1.10.0 github.com/klauspost/compress v1.15.12 github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9 - github.com/urfave/cli/v2 v2.23.2 + github.com/urfave/cli/v2 v2.23.4 github.com/valyala/fastjson v1.6.3 github.com/valyala/fastrand v1.1.0 github.com/valyala/fasttemplate v1.2.2 diff --git a/go.sum b/go.sum index 92ddbba2a..f0a2434bf 100644 --- a/go.sum +++ b/go.sum @@ -834,8 +834,8 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/urfave/cli/v2 v2.23.2 h1:34bT/FlchakhE5j+PggFYUfiGZBnrNxJRRVB9AQODOo= -github.com/urfave/cli/v2 v2.23.2/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= +github.com/urfave/cli/v2 v2.23.4 h1:gcaHwki8kGX6lfp2zz7irxu7eZkcIl1Xapt6XW0Ynqc= +github.com/urfave/cli/v2 v2.23.4/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= diff --git a/vendor/github.com/urfave/cli/v2/godoc-current.txt b/vendor/github.com/urfave/cli/v2/godoc-current.txt index 54424fb1e..b8dbf6ad0 100644 --- a/vendor/github.com/urfave/cli/v2/godoc-current.txt +++ b/vendor/github.com/urfave/cli/v2/godoc-current.txt @@ -2437,6 +2437,7 @@ type InputSourceContext interface { String(name string) (string, error) StringSlice(name string) ([]string, error) IntSlice(name string) ([]int, error) + Int64Slice(name string) ([]int64, error) Generic(name string) (cli.Generic, error) Bool(name string) (bool, error) @@ -2494,6 +2495,9 @@ func (f *Int64SliceFlag) Apply(set *flag.FlagSet) error Apply saves the flagSet for later usage calls, then calls the wrapped Int64SliceFlag.Apply +func (f *Int64SliceFlag) ApplyInputSourceValue(cCtx *cli.Context, isc InputSourceContext) error + ApplyInputSourceValue applies a Int64Slice value if required + type IntFlag struct { *cli.IntFlag // Has unexported fields. @@ -2554,6 +2558,10 @@ func (fsm *MapInputSource) Generic(name string) (cli.Generic, error) func (fsm *MapInputSource) Int(name string) (int, error) Int returns an int from the map if it exists otherwise returns 0 +func (fsm *MapInputSource) Int64Slice(name string) ([]int64, error) + Int64Slice returns an []int64 from the map if it exists otherwise returns + nil + func (fsm *MapInputSource) IntSlice(name string) ([]int, error) IntSlice returns an []int from the map if it exists otherwise returns nil diff --git a/vendor/github.com/urfave/cli/v2/help.go b/vendor/github.com/urfave/cli/v2/help.go index 6dc593b34..2ccd3b71e 100644 --- a/vendor/github.com/urfave/cli/v2/help.go +++ b/vendor/github.com/urfave/cli/v2/help.go @@ -246,7 +246,7 @@ func ShowCommandHelp(ctx *Context, command string) error { } for _, c := range commands { if c.HasName(command) { - if !ctx.App.HideHelpCommand && !c.HasName(helpName) && len(c.Subcommands) != 0 { + if !ctx.App.HideHelpCommand && !c.HasName(helpName) && len(c.Subcommands) != 0 && c.Command(helpName) == nil { c.Subcommands = append(c.Subcommands, helpCommandDontUse) } if !ctx.App.HideHelp && HelpFlag != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index fb4eb4a2a..4846f630e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -333,7 +333,7 @@ github.com/rivo/uniseg # github.com/russross/blackfriday/v2 v2.1.0 ## explicit github.com/russross/blackfriday/v2 -# github.com/urfave/cli/v2 v2.23.2 +# github.com/urfave/cli/v2 v2.23.4 ## explicit; go 1.18 github.com/urfave/cli/v2 # github.com/valyala/bytebufferpool v1.0.0