lib/storage: follow-up for 790768f20b

- Document the bugfix at docs/CHANGELOG.md
- Simplify the bugfix a bit
This commit is contained in:
Aliaksandr Valialkin 2022-11-07 14:04:06 +02:00
parent 4eec43bf26
commit fe6868cb8e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 78 additions and 27 deletions

View file

@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* SECURITY: update Go builder to v1.19.3. This fixes [CVE-2022 security issue](https://github.com/golang/go/issues/56328). See [the changelog](https://github.com/golang/go/issues?q=milestone%3AGo1.19.3+label%3ACherryPickApproved).
* BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly escape string passed to `quotesEscape` [template function](https://docs.victoriametrics.com/vmalert.html#template-functions), so it can be safely embedded into JSON string. This makes obsolete the `crlfEscape` function. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3139) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/890) issue.

View file

@ -714,10 +714,12 @@ func (s *Storage) currHourMetricIDsUpdater() {
for {
select {
case <-s.stop:
s.updateCurrHourMetricIDs()
hour := fasttime.UnixHour()
s.updateCurrHourMetricIDs(hour)
return
case <-ticker.C:
s.updateCurrHourMetricIDs()
hour := fasttime.UnixHour()
s.updateCurrHourMetricIDs(hour)
}
}
}
@ -730,10 +732,12 @@ func (s *Storage) nextDayMetricIDsUpdater() {
for {
select {
case <-s.stop:
s.updateNextDayMetricIDs()
date := fasttime.UnixDate()
s.updateNextDayMetricIDs(date)
return
case <-ticker.C:
s.updateNextDayMetricIDs()
date := fasttime.UnixDate()
s.updateNextDayMetricIDs(date)
}
}
}
@ -2477,8 +2481,7 @@ type byDateMetricIDEntry struct {
v uint64set.Set
}
func (s *Storage) updateNextDayMetricIDs() {
date := fasttime.UnixDate()
func (s *Storage) updateNextDayMetricIDs(date uint64) {
e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry)
s.pendingNextDayMetricIDsLock.Lock()
pendingMetricIDs := s.pendingNextDayMetricIDs
@ -2492,6 +2495,11 @@ func (s *Storage) updateNextDayMetricIDs() {
// Slow path: union pendingMetricIDs with e.v
if e.date == date {
pendingMetricIDs.Union(&e.v)
} else {
// Do not add pendingMetricIDs from the previous day to the cyrrent day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
pendingMetricIDs = &uint64set.Set{}
}
eNew := &byDateMetricIDEntry{
date: date,
@ -2500,14 +2508,13 @@ func (s *Storage) updateNextDayMetricIDs() {
s.nextDayMetricIDs.Store(eNew)
}
func (s *Storage) updateCurrHourMetricIDs() {
func (s *Storage) updateCurrHourMetricIDs(hour uint64) {
hm := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.pendingHourEntriesLock.Lock()
newEntries := append([]pendingHourMetricIDEntry{}, s.pendingHourEntries...)
s.pendingHourEntries = s.pendingHourEntries[:0]
s.pendingHourEntriesLock.Unlock()
hour := fasttime.UnixHour()
if len(newEntries) == 0 && hm.hour == hour {
// Fast path: nothing to update.
return
@ -2529,18 +2536,23 @@ func (s *Storage) updateCurrHourMetricIDs() {
isFull = true
}
for _, x := range newEntries {
m.Add(x.MetricID)
k := accountProjectKey{
AccountID: x.AccountID,
ProjectID: x.ProjectID,
if hour%24 != 0 {
// Do not add pending metricIDs from the previous hour to the current hour on the next day,
// since this may result in missing registration of the metricIDs in the per-day inverted index.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
for _, x := range newEntries {
m.Add(x.MetricID)
k := accountProjectKey{
AccountID: x.AccountID,
ProjectID: x.ProjectID,
}
e := byTenant[k]
if e == nil {
e = &uint64set.Set{}
byTenant[k] = e
}
e.Add(x.MetricID)
}
e := byTenant[k]
if e == nil {
e = &uint64set.Set{}
byTenant[k] = e
}
e.Add(x.MetricID)
}
hmNew := &hourMetricIDs{

View file

@ -11,6 +11,7 @@ import (
"testing/quick"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
@ -154,7 +155,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
}
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: 123,
@ -162,7 +163,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
@ -189,7 +190,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
})
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
@ -197,7 +198,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
@ -249,7 +250,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
x.Add(e.MetricID)
}
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: 123,
@ -257,7 +258,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
@ -309,7 +310,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
x.Add(e.MetricID)
}
hour := uint64(timestampFromTime(time.Now())) / msecPerHour
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
@ -317,7 +318,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs()
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
// It is possible new hour occurred. Update the hour and verify it again.
@ -354,6 +355,43 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", len(s.pendingHourEntries), 0)
}
})
t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) {
s := newStorage()
hour := fasttime.UnixHour()
hour -= hour % 24
s.pendingHourEntries = []pendingHourMetricIDEntry{
{AccountID: 123, ProjectID: 431, MetricID: 343},
{AccountID: 123, ProjectID: 431, MetricID: 32424},
{AccountID: 1, ProjectID: 2, MetricID: 8293432},
}
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour - 1,
}
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
if hmCurr.m.Len() != 0 {
t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil))
}
byTenantExpected := make(map[accountProjectKey]*uint64set.Set)
if !reflect.DeepEqual(hmCurr.byTenant, byTenantExpected) {
t.Fatalf("unexpected hmPrev.byTenant; got %v; want %v", hmCurr.byTenant, byTenantExpected)
}
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
if len(s.pendingHourEntries) != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", len(s.pendingHourEntries), 0)
}
})
}
func TestMetricRowMarshalUnmarshal(t *testing.T) {