diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f0cb0e72af..60217e2380 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -16,7 +16,9 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246). + * BUGFIX: [DataDog protocol parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): do not re-use `host` and `device` fields from the previously parsed messages if these fields are missing in the currently parsed message. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432). +* BUGFIX: reduce CPU usage when the regex-based relabeling rules are applied to more than 100K unique Graphite metrics. See [this issue](https://docs.victoriametrics.com/CHANGELOG.html#v1820). The issue was introduced in [v1.82.0](https://docs.victoriametrics.com/CHANGELOG.html#v1820). ## [v1.85.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.0) diff --git a/lib/bytesutil/fast_string_matcher.go b/lib/bytesutil/fast_string_matcher.go index fdbc2b7e21..d2933563bf 100644 --- a/lib/bytesutil/fast_string_matcher.go +++ b/lib/bytesutil/fast_string_matcher.go @@ -4,6 +4,8 @@ import ( "strings" "sync" "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // FastStringMatcher implements fast matcher for strings. @@ -11,44 +13,68 @@ import ( // It caches string match results and returns them back on the next calls // without calling the matchFunc, which may be expensive. type FastStringMatcher struct { - m atomic.Value - mLen uint64 + lastCleanupTime uint64 + + m sync.Map matchFunc func(s string) bool } +type fsmEntry struct { + lastAccessTime uint64 + ok bool +} + // NewFastStringMatcher creates new matcher, which applies matchFunc to strings passed to Match() // // matchFunc must return the same result for the same input. func NewFastStringMatcher(matchFunc func(s string) bool) *FastStringMatcher { - var fsm FastStringMatcher - fsm.m.Store(&sync.Map{}) - fsm.matchFunc = matchFunc - return &fsm + return &FastStringMatcher{ + lastCleanupTime: fasttime.UnixTimestamp(), + matchFunc: matchFunc, + } } // Match applies matchFunc to s and returns the result. func (fsm *FastStringMatcher) Match(s string) bool { - m := fsm.m.Load().(*sync.Map) - v, ok := m.Load(s) + ct := fasttime.UnixTimestamp() + v, ok := fsm.m.Load(s) if ok { // Fast path - s match result is found in the cache. - bp := v.(*bool) - return *bp + e := v.(*fsmEntry) + if atomic.LoadUint64(&e.lastAccessTime)+10 < ct { + // Reduce the frequency of e.lastAccessTime update to once per 10 seconds + // in order to improve the fast path speed on systems with many CPU cores. + atomic.StoreUint64(&e.lastAccessTime, ct) + } + return e.ok } // Slow path - run matchFunc for s and store the result in the cache. b := fsm.matchFunc(s) - bp := &b + e := &fsmEntry{ + lastAccessTime: ct, + ok: b, + } // Make a copy of s in order to limit memory usage to the s length, // since the s may point to bigger string. // This also protects from the case when s contains unsafe string, which points to a temporary byte slice. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3227 s = strings.Clone(s) - m.Store(s, bp) - n := atomic.AddUint64(&fsm.mLen, 1) - if n > 100e3 { - atomic.StoreUint64(&fsm.mLen, 0) - fsm.m.Store(&sync.Map{}) + fsm.m.Store(s, e) + + if atomic.LoadUint64(&fsm.lastCleanupTime)+61 < ct { + // Perform a global cleanup for fsm.m by removing items, which weren't accessed + // during the last 5 minutes. + atomic.StoreUint64(&fsm.lastCleanupTime, ct) + m := &fsm.m + m.Range(func(k, v interface{}) bool { + e := v.(*fsmEntry) + if atomic.LoadUint64(&e.lastAccessTime)+5*60 < ct { + m.Delete(k) + } + return true + }) } + return b } diff --git a/lib/bytesutil/fast_string_transformer.go b/lib/bytesutil/fast_string_transformer.go index 2dfa31af01..c131d15e85 100644 --- a/lib/bytesutil/fast_string_transformer.go +++ b/lib/bytesutil/fast_string_transformer.go @@ -4,6 +4,8 @@ import ( "strings" "sync" "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // FastStringTransformer implements fast transformer for strings. @@ -11,30 +13,41 @@ import ( // It caches transformed strings and returns them back on the next calls // without calling the transformFunc, which may be expensive. type FastStringTransformer struct { - m atomic.Value - mLen uint64 + lastCleanupTime uint64 + + m sync.Map transformFunc func(s string) string } +type fstEntry struct { + lastAccessTime uint64 + s string +} + // NewFastStringTransformer creates new transformer, which applies transformFunc to strings passed to Transform() // // transformFunc must return the same result for the same input. func NewFastStringTransformer(transformFunc func(s string) string) *FastStringTransformer { - var fst FastStringTransformer - fst.m.Store(&sync.Map{}) - fst.transformFunc = transformFunc - return &fst + return &FastStringTransformer{ + lastCleanupTime: fasttime.UnixTimestamp(), + transformFunc: transformFunc, + } } // Transform applies transformFunc to s and returns the result. func (fst *FastStringTransformer) Transform(s string) string { - m := fst.m.Load().(*sync.Map) - v, ok := m.Load(s) + ct := fasttime.UnixTimestamp() + v, ok := fst.m.Load(s) if ok { // Fast path - the transformed s is found in the cache. - sp := v.(*string) - return *sp + e := v.(*fstEntry) + if atomic.LoadUint64(&e.lastAccessTime)+10 < ct { + // Reduce the frequency of e.lastAccessTime update to once per 10 seconds + // in order to improve the fast path speed on systems with many CPU cores. + atomic.StoreUint64(&e.lastAccessTime, ct) + } + return e.s } // Slow path - transform s and store it in the cache. sTransformed := fst.transformFunc(s) @@ -48,12 +61,25 @@ func (fst *FastStringTransformer) Transform(s string) string { // which, in turn, can point to bigger string. sTransformed = s } - sp := &sTransformed - m.Store(s, sp) - n := atomic.AddUint64(&fst.mLen, 1) - if n > 100e3 { - atomic.StoreUint64(&fst.mLen, 0) - fst.m.Store(&sync.Map{}) + e := &fstEntry{ + lastAccessTime: ct, + s: sTransformed, } + fst.m.Store(s, e) + + if atomic.LoadUint64(&fst.lastCleanupTime)+61 < ct { + // Perform a global cleanup for fst.m by removing items, which weren't accessed + // during the last 5 minutes. + atomic.StoreUint64(&fst.lastCleanupTime, ct) + m := &fst.m + m.Range(func(k, v interface{}) bool { + e := v.(*fstEntry) + if atomic.LoadUint64(&e.lastAccessTime)+5*60 < ct { + m.Delete(k) + } + return true + }) + } + return sTransformed } diff --git a/lib/bytesutil/internstring.go b/lib/bytesutil/internstring.go index 9318bc980e..45c9a63285 100644 --- a/lib/bytesutil/internstring.go +++ b/lib/bytesutil/internstring.go @@ -1,35 +1,58 @@ package bytesutil import ( + "strings" "sync" "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) // InternString returns interned s. // // This may be needed for reducing the amounts of allocated memory. func InternString(s string) string { - m := internStringsMap.Load().(*sync.Map) - if v, ok := m.Load(s); ok { - sp := v.(*string) - return *sp + ct := fasttime.UnixTimestamp() + if v, ok := internStringsMap.Load(s); ok { + e := v.(*ismEntry) + if atomic.LoadUint64(&e.lastAccessTime)+10 < ct { + // Reduce the frequency of e.lastAccessTime update to once per 10 seconds + // in order to improve the fast path speed on systems with many CPU cores. + atomic.StoreUint64(&e.lastAccessTime, ct) + } + return e.s } // Make a new copy for s in order to remove references from possible bigger string s refers to. - sCopy := string(append([]byte{}, s...)) - m.Store(sCopy, &sCopy) - n := atomic.AddUint64(&internStringsMapLen, 1) - if n > 100e3 { - atomic.StoreUint64(&internStringsMapLen, 0) - internStringsMap.Store(&sync.Map{}) + sCopy := strings.Clone(s) + e := &ismEntry{ + lastAccessTime: ct, + s: sCopy, } + internStringsMap.Store(sCopy, e) + + if atomic.LoadUint64(&internStringsMapLastCleanupTime)+61 < ct { + // Perform a global cleanup for internStringsMap by removing items, which weren't accessed + // during the last 5 minutes. + atomic.StoreUint64(&internStringsMapLastCleanupTime, ct) + m := &internStringsMap + m.Range(func(k, v interface{}) bool { + e := v.(*ismEntry) + if atomic.LoadUint64(&e.lastAccessTime)+5*60 < ct { + m.Delete(k) + } + return true + }) + } + return sCopy } -var ( - internStringsMap atomic.Value - internStringsMapLen uint64 -) - -func init() { - internStringsMap.Store(&sync.Map{}) +type ismEntry struct { + lastAccessTime uint64 + s string } + +var ( + internStringsMap sync.Map + internStringsMapLastCleanupTime uint64 +)