diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1d6371ee8..3aef080f2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* BUGFIX: properly register new time series in per-day inverted index if they were ingested during the last 10 seconds of the day. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309). Thanks to @lmarszal for the bugreport and for the [initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3320). + ## [v1.83.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.83.0) diff --git a/go.mod b/go.mod index d71134e4b..91f258313 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/oklog/ulid v1.3.1 github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9 - github.com/urfave/cli/v2 v2.23.2 + github.com/urfave/cli/v2 v2.23.4 github.com/valyala/fastjson v1.6.3 github.com/valyala/fastrand v1.1.0 github.com/valyala/fasttemplate v1.2.2 diff --git a/go.sum b/go.sum index 92ddbba2a..f0a2434bf 100644 --- a/go.sum +++ b/go.sum @@ -834,8 +834,8 @@ github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/urfave/cli/v2 v2.23.2 h1:34bT/FlchakhE5j+PggFYUfiGZBnrNxJRRVB9AQODOo= -github.com/urfave/cli/v2 v2.23.2/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= +github.com/urfave/cli/v2 v2.23.4 h1:gcaHwki8kGX6lfp2zz7irxu7eZkcIl1Xapt6XW0Ynqc= +github.com/urfave/cli/v2 v2.23.4/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus= diff --git a/lib/envtemplate/envtemplate.go b/lib/envtemplate/envtemplate.go index 9d06435bf..6ad779d63 100644 --- a/lib/envtemplate/envtemplate.go +++ b/lib/envtemplate/envtemplate.go @@ -5,6 +5,7 @@ import ( "io" "log" "os" + "regexp" "strings" "github.com/valyala/fasttemplate" @@ -93,14 +94,23 @@ func expand(m map[string]string, s string) (string, error) { return s, nil } result, err := fasttemplate.ExecuteFuncStringWithErr(s, "%{", "}", func(w io.Writer, tag string) (int, error) { + if !isValidEnvVarName(tag) { + return fmt.Fprintf(w, "%%{%s}", tag) + } v, ok := m[tag] if !ok { return 0, fmt.Errorf("missing %q env var", tag) } - return w.Write([]byte(v)) + return fmt.Fprintf(w, "%s", v) }) if err != nil { return "", err } return result, nil } + +func isValidEnvVarName(s string) bool { + return envVarNameRegex.MatchString(s) +} + +var envVarNameRegex = regexp.MustCompile("^[a-zA-Z0-9_]+$") diff --git a/lib/envtemplate/envtemplate_test.go b/lib/envtemplate/envtemplate_test.go index 375ece944..cdd7b9ab3 100644 --- a/lib/envtemplate/envtemplate_test.go +++ b/lib/envtemplate/envtemplate_test.go @@ -24,6 +24,7 @@ func TestExpandTemplates(t *testing.T) { f([]string{"foo=%{bar}", "bar=x"}, []string{"bar=x", "foo=x"}) f([]string{"a=x%{b}", "b=y%{c}z%{d}", "c=123", "d=qwe"}, []string{"a=xy123zqwe", "b=y123zqwe", "c=123", "d=qwe"}) f([]string{"a=x%{b}y", "b=z%{a}q", "c"}, []string{"a=xzxzxzxz%{a}qyqyqyqy", "b=zxzxzxzx%{b}yqyqyqyq", "c="}) + f([]string{"a=%{x.y}"}, []string{"a=%{x.y}"}) } func TestLookupEnv(t *testing.T) { @@ -70,6 +71,7 @@ func TestReplaceSuccess(t *testing.T) { f("", "") f("foo", "foo") f("a %{foo}-x", "a bar-x") + f("%{foo.bar}", "%{foo.bar}") } func TestReplaceFailure(t *testing.T) { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index dc732cf9a..016830cb6 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -700,10 +700,12 @@ func (s *Storage) currHourMetricIDsUpdater() { for { select { case <-s.stop: - s.updateCurrHourMetricIDs() + hour := fasttime.UnixHour() + s.updateCurrHourMetricIDs(hour) return case <-ticker.C: - s.updateCurrHourMetricIDs() + hour := fasttime.UnixHour() + s.updateCurrHourMetricIDs(hour) } } } @@ -716,10 +718,12 @@ func (s *Storage) nextDayMetricIDsUpdater() { for { select { case <-s.stop: - s.updateNextDayMetricIDs() + date := fasttime.UnixDate() + s.updateNextDayMetricIDs(date) return case <-ticker.C: - s.updateNextDayMetricIDs() + date := fasttime.UnixDate() + s.updateNextDayMetricIDs(date) } } } @@ -895,14 +899,12 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs logger.Panicf("FATAL: cannot read %s: %s", path, err) } srcOrigLen := len(src) - if len(src) < 24 { - logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) + if len(src) < 16 { + logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 16) return hm } // Unmarshal header - isFull := encoding.UnmarshalUint64(src) - src = src[8:] hourLoaded := encoding.UnmarshalUint64(src) src = src[8:] if hourLoaded != hour { @@ -921,7 +923,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs return hm } hm.m = m - hm.isFull = isFull != 0 logger.Infof("loaded %s from %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime).Seconds(), m.Len(), srcOrigLen) return hm } @@ -950,13 +951,8 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { logger.Infof("saving %s to %q...", name, path) startTime := time.Now() dst := make([]byte, 0, hm.m.Len()*8+24) - isFull := uint64(0) - if hm.isFull { - isFull = 1 - } // Marshal header - dst = encoding.MarshalUint64(dst, isFull) dst = encoding.MarshalUint64(dst, hm.hour) // Marshal hm.m @@ -2298,8 +2294,7 @@ type byDateMetricIDEntry struct { v uint64set.Set } -func (s *Storage) updateNextDayMetricIDs() { - date := fasttime.UnixDate() +func (s *Storage) updateNextDayMetricIDs(date uint64) { e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) s.pendingNextDayMetricIDsLock.Lock() pendingMetricIDs := s.pendingNextDayMetricIDs @@ -2313,6 +2308,11 @@ func (s *Storage) updateNextDayMetricIDs() { // Slow path: union pendingMetricIDs with e.v if e.date == date { pendingMetricIDs.Union(&e.v) + } else { + // Do not add pendingMetricIDs from the previous day to the cyrrent day, + // since this may result in missing registration of the metricIDs in the per-day inverted index. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 + pendingMetricIDs = &uint64set.Set{} } eNew := &byDateMetricIDEntry{ date: date, @@ -2321,14 +2321,13 @@ func (s *Storage) updateNextDayMetricIDs() { s.nextDayMetricIDs.Store(eNew) } -func (s *Storage) updateCurrHourMetricIDs() { +func (s *Storage) updateCurrHourMetricIDs(hour uint64) { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourEntriesLock.Lock() newMetricIDs := s.pendingHourEntries s.pendingHourEntries = &uint64set.Set{} s.pendingHourEntriesLock.Unlock() - hour := fasttime.UnixHour() if newMetricIDs.Len() == 0 && hm.hour == hour { // Fast path: nothing to update. return @@ -2336,18 +2335,20 @@ func (s *Storage) updateCurrHourMetricIDs() { // Slow path: hm.m must be updated with non-empty s.pendingHourEntries. var m *uint64set.Set - isFull := hm.isFull if hm.hour == hour { m = hm.m.Clone() } else { m = &uint64set.Set{} - isFull = true } - m.Union(newMetricIDs) + if hour%24 != 0 { + // Do not add pending metricIDs from the previous hour to the current hour on the next day, + // since this may result in missing registration of the metricIDs in the per-day inverted index. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309 + m.Union(newMetricIDs) + } hmNew := &hourMetricIDs{ - m: m, - hour: hour, - isFull: isFull, + m: m, + hour: hour, } s.currHourMetricIDs.Store(hmNew) if hm.hour != hour { @@ -2356,9 +2357,8 @@ func (s *Storage) updateCurrHourMetricIDs() { } type hourMetricIDs struct { - m *uint64set.Set - hour uint64 - isFull bool + m *uint64set.Set + hour uint64 } type generationTSID struct { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index d9cae957a..1a01c48ea 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -11,6 +11,7 @@ import ( "testing/quick" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) @@ -155,7 +156,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { } t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: 123, @@ -163,21 +164,14 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } if hmCurr.m.Len() != 0 { t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0) } - if !hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) if !reflect.DeepEqual(hmPrev, hmOrig) { @@ -190,7 +184,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { }) t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, @@ -198,23 +192,14 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } - // Do not run other checks, since they may fail. - return + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } if !reflect.DeepEqual(hmCurr, hmOrig) { t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig) } - if hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmEmpty := &hourMetricIDs{} @@ -234,7 +219,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { pendingHourEntries.Add(8293432) s.pendingHourEntries = pendingHourEntries - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: 123, @@ -242,21 +227,14 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } if !hmCurr.m.Equal(pendingHourEntries) { t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries) } - if !hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) if !reflect.DeepEqual(hmPrev, hmOrig) { @@ -275,7 +253,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { pendingHourEntries.Add(8293432) s.pendingHourEntries = pendingHourEntries - hour := uint64(timestampFromTime(time.Now())) / msecPerHour + hour := fasttime.UnixHour() hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, @@ -283,16 +261,10 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) - s.updateCurrHourMetricIDs() + s.updateCurrHourMetricIDs(hour) hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { - // It is possible new hour occurred. Update the hour and verify it again. - hour = uint64(timestampFromTime(time.Now())) / msecPerHour - if hmCurr.hour != hour { - t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) - } - // Do not run other checks, since they may fail. - return + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } m := pendingHourEntries.Clone() hmOrig.m.ForEach(func(part []uint64) bool { @@ -304,9 +276,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { if !hmCurr.m.Equal(m) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) } - if hmCurr.isFull { - t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) - } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmEmpty := &hourMetricIDs{} @@ -318,6 +287,39 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) + t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) { + s := newStorage() + + hour := fasttime.UnixHour() + hour -= hour % 24 + + pendingHourEntries := &uint64set.Set{} + pendingHourEntries.Add(343) + pendingHourEntries.Add(32424) + pendingHourEntries.Add(8293432) + s.pendingHourEntries = pendingHourEntries + + hmOrig := &hourMetricIDs{ + m: &uint64set.Set{}, + hour: hour - 1, + } + s.currHourMetricIDs.Store(hmOrig) + s.updateCurrHourMetricIDs(hour) + hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + if hmCurr.hour != hour { + t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) + } + if hmCurr.m.Len() != 0 { + t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil)) + } + hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + if !reflect.DeepEqual(hmPrev, hmOrig) { + t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) + } + if s.pendingHourEntries.Len() != 0 { + t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) + } + }) } func TestMetricRowMarshalUnmarshal(t *testing.T) { diff --git a/vendor/github.com/urfave/cli/v2/godoc-current.txt b/vendor/github.com/urfave/cli/v2/godoc-current.txt index 54424fb1e..b8dbf6ad0 100644 --- a/vendor/github.com/urfave/cli/v2/godoc-current.txt +++ b/vendor/github.com/urfave/cli/v2/godoc-current.txt @@ -2437,6 +2437,7 @@ type InputSourceContext interface { String(name string) (string, error) StringSlice(name string) ([]string, error) IntSlice(name string) ([]int, error) + Int64Slice(name string) ([]int64, error) Generic(name string) (cli.Generic, error) Bool(name string) (bool, error) @@ -2494,6 +2495,9 @@ func (f *Int64SliceFlag) Apply(set *flag.FlagSet) error Apply saves the flagSet for later usage calls, then calls the wrapped Int64SliceFlag.Apply +func (f *Int64SliceFlag) ApplyInputSourceValue(cCtx *cli.Context, isc InputSourceContext) error + ApplyInputSourceValue applies a Int64Slice value if required + type IntFlag struct { *cli.IntFlag // Has unexported fields. @@ -2554,6 +2558,10 @@ func (fsm *MapInputSource) Generic(name string) (cli.Generic, error) func (fsm *MapInputSource) Int(name string) (int, error) Int returns an int from the map if it exists otherwise returns 0 +func (fsm *MapInputSource) Int64Slice(name string) ([]int64, error) + Int64Slice returns an []int64 from the map if it exists otherwise returns + nil + func (fsm *MapInputSource) IntSlice(name string) ([]int, error) IntSlice returns an []int from the map if it exists otherwise returns nil diff --git a/vendor/github.com/urfave/cli/v2/help.go b/vendor/github.com/urfave/cli/v2/help.go index 6dc593b34..2ccd3b71e 100644 --- a/vendor/github.com/urfave/cli/v2/help.go +++ b/vendor/github.com/urfave/cli/v2/help.go @@ -246,7 +246,7 @@ func ShowCommandHelp(ctx *Context, command string) error { } for _, c := range commands { if c.HasName(command) { - if !ctx.App.HideHelpCommand && !c.HasName(helpName) && len(c.Subcommands) != 0 { + if !ctx.App.HideHelpCommand && !c.HasName(helpName) && len(c.Subcommands) != 0 && c.Command(helpName) == nil { c.Subcommands = append(c.Subcommands, helpCommandDontUse) } if !ctx.App.HideHelp && HelpFlag != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index fb4eb4a2a..4846f630e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -333,7 +333,7 @@ github.com/rivo/uniseg # github.com/russross/blackfriday/v2 v2.1.0 ## explicit github.com/russross/blackfriday/v2 -# github.com/urfave/cli/v2 v2.23.2 +# github.com/urfave/cli/v2 v2.23.4 ## explicit; go 1.18 github.com/urfave/cli/v2 # github.com/valyala/bytebufferpool v1.0.0