mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: follow-up for 790768f20b
- Document the bugfix at docs/CHANGELOG.md - Simplify the bugfix a bit
This commit is contained in:
parent
3d7e03cffa
commit
1fbf94a1c4
3 changed files with 64 additions and 17 deletions
|
@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
* SECURITY: update Go builder to v1.19.3. This fixes [CVE-2022 security issue](https://github.com/golang/go/issues/56328). See [the changelog](https://github.com/golang/go/issues?q=milestone%3AGo1.19.3+label%3ACherryPickApproved).
|
||||
|
||||
* BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level.
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly escape string passed to `quotesEscape` [template function](https://docs.victoriametrics.com/vmalert.html#template-functions), so it can be safely embedded into JSON string. This makes obsolete the `crlfEscape` function. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3139) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/890) issue.
|
||||
|
|
|
@ -699,10 +699,12 @@ func (s *Storage) currHourMetricIDsUpdater() {
|
|||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
s.updateCurrHourMetricIDs()
|
||||
hour := fasttime.UnixHour()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.updateCurrHourMetricIDs()
|
||||
hour := fasttime.UnixHour()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -715,10 +717,12 @@ func (s *Storage) nextDayMetricIDsUpdater() {
|
|||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
s.updateNextDayMetricIDs()
|
||||
date := fasttime.UnixDate()
|
||||
s.updateNextDayMetricIDs(date)
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.updateNextDayMetricIDs()
|
||||
date := fasttime.UnixDate()
|
||||
s.updateNextDayMetricIDs(date)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2351,8 +2355,7 @@ type byDateMetricIDEntry struct {
|
|||
v uint64set.Set
|
||||
}
|
||||
|
||||
func (s *Storage) updateNextDayMetricIDs() {
|
||||
date := fasttime.UnixDate()
|
||||
func (s *Storage) updateNextDayMetricIDs(date uint64) {
|
||||
e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry)
|
||||
s.pendingNextDayMetricIDsLock.Lock()
|
||||
pendingMetricIDs := s.pendingNextDayMetricIDs
|
||||
|
@ -2366,6 +2369,11 @@ func (s *Storage) updateNextDayMetricIDs() {
|
|||
// Slow path: union pendingMetricIDs with e.v
|
||||
if e.date == date {
|
||||
pendingMetricIDs.Union(&e.v)
|
||||
} else {
|
||||
// Do not add pendingMetricIDs from the previous day to the cyrrent day,
|
||||
// since this may result in missing registration of the metricIDs in the per-day inverted index.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
|
||||
pendingMetricIDs = &uint64set.Set{}
|
||||
}
|
||||
eNew := &byDateMetricIDEntry{
|
||||
date: date,
|
||||
|
@ -2374,14 +2382,13 @@ func (s *Storage) updateNextDayMetricIDs() {
|
|||
s.nextDayMetricIDs.Store(eNew)
|
||||
}
|
||||
|
||||
func (s *Storage) updateCurrHourMetricIDs() {
|
||||
func (s *Storage) updateCurrHourMetricIDs(hour uint64) {
|
||||
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
s.pendingHourEntriesLock.Lock()
|
||||
newMetricIDs := s.pendingHourEntries
|
||||
s.pendingHourEntries = &uint64set.Set{}
|
||||
s.pendingHourEntriesLock.Unlock()
|
||||
|
||||
hour := fasttime.UnixHour()
|
||||
if newMetricIDs.Len() == 0 && hm.hour == hour {
|
||||
// Fast path: nothing to update.
|
||||
return
|
||||
|
@ -2396,7 +2403,12 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
|||
m = &uint64set.Set{}
|
||||
isFull = true
|
||||
}
|
||||
m.Union(newMetricIDs)
|
||||
if hour%24 != 0 {
|
||||
// Do not add pending metricIDs from the previous hour to the current hour on the next day,
|
||||
// since this may result in missing registration of the metricIDs in the per-day inverted index.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
|
||||
m.Union(newMetricIDs)
|
||||
}
|
||||
hmNew := &hourMetricIDs{
|
||||
m: m,
|
||||
hour: hour,
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"testing/quick"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
|
@ -155,7 +156,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
}
|
||||
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hour := fasttime.UnixHour()
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: 123,
|
||||
|
@ -163,7 +164,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
|
@ -190,7 +191,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
})
|
||||
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hour := fasttime.UnixHour()
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: hour,
|
||||
|
@ -198,7 +199,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
|
@ -234,7 +235,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
pendingHourEntries.Add(8293432)
|
||||
s.pendingHourEntries = pendingHourEntries
|
||||
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hour := fasttime.UnixHour()
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: 123,
|
||||
|
@ -242,7 +243,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
|
@ -275,7 +276,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
pendingHourEntries.Add(8293432)
|
||||
s.pendingHourEntries = pendingHourEntries
|
||||
|
||||
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
|
||||
hour := fasttime.UnixHour()
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: hour,
|
||||
|
@ -283,7 +284,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
hmOrig.m.Add(12)
|
||||
hmOrig.m.Add(34)
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs()
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
// It is possible new hour occurred. Update the hour and verify it again.
|
||||
|
@ -318,6 +319,39 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|||
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
||||
}
|
||||
})
|
||||
t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) {
|
||||
s := newStorage()
|
||||
|
||||
hour := fasttime.UnixHour()
|
||||
hour -= hour % 24
|
||||
|
||||
pendingHourEntries := &uint64set.Set{}
|
||||
pendingHourEntries.Add(343)
|
||||
pendingHourEntries.Add(32424)
|
||||
pendingHourEntries.Add(8293432)
|
||||
s.pendingHourEntries = pendingHourEntries
|
||||
|
||||
hmOrig := &hourMetricIDs{
|
||||
m: &uint64set.Set{},
|
||||
hour: hour - 1,
|
||||
}
|
||||
s.currHourMetricIDs.Store(hmOrig)
|
||||
s.updateCurrHourMetricIDs(hour)
|
||||
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if hmCurr.hour != hour {
|
||||
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
||||
}
|
||||
if hmCurr.m.Len() != 0 {
|
||||
t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil))
|
||||
}
|
||||
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
||||
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
||||
}
|
||||
if s.pendingHourEntries.Len() != 0 {
|
||||
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestMetricRowMarshalUnmarshal(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue