mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/bytesutil: cache results for all the input strings, which were passed during the last 5 minutes from FastStringMatcher.Match(), FastStringTransformer.Transform() and InternString()
Previously only up to 100K results were cached. This could result in sub-optimal performance when more than 100K unique strings were actually used. For example, when the relabeling rule was applied to a million of unique Graphite metric names like in the https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3466 This commit should reduce the long-term CPU usage for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3466 after all the unique Graphite metrics are registered in the FastStringMatcher.Transform() cache. It is expected that the number of unique strings, which are passed to FastStringMatcher.Match(), FastStringTransformer.Transform() and to InternString() during the last 5 minutes, is limited, so the function results fit memory. Otherwise OOM crash can occur. This should be the case for typical production workloads.
This commit is contained in:
parent
600023a8f3
commit
f3e5c9c246
4 changed files with 126 additions and 49 deletions
|
@ -16,7 +16,9 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
## tip
|
## tip
|
||||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246).
|
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support `$for` or `.For` template variables in alert's annotations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3246).
|
||||||
|
|
||||||
|
|
||||||
* BUGFIX: [DataDog protocol parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): do not re-use `host` and `device` fields from the previously parsed messages if these fields are missing in the currently parsed message. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432).
|
* BUGFIX: [DataDog protocol parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): do not re-use `host` and `device` fields from the previously parsed messages if these fields are missing in the currently parsed message. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3432).
|
||||||
|
* BUGFIX: reduce CPU usage when the regex-based relabeling rules are applied to more than 100K unique Graphite metrics. See [this issue](https://docs.victoriametrics.com/CHANGELOG.html#v1820). The issue was introduced in [v1.82.0](https://docs.victoriametrics.com/CHANGELOG.html#v1820).
|
||||||
|
|
||||||
|
|
||||||
## [v1.85.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.0)
|
## [v1.85.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.85.0)
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FastStringMatcher implements fast matcher for strings.
|
// FastStringMatcher implements fast matcher for strings.
|
||||||
|
@ -11,44 +13,68 @@ import (
|
||||||
// It caches string match results and returns them back on the next calls
|
// It caches string match results and returns them back on the next calls
|
||||||
// without calling the matchFunc, which may be expensive.
|
// without calling the matchFunc, which may be expensive.
|
||||||
type FastStringMatcher struct {
|
type FastStringMatcher struct {
|
||||||
m atomic.Value
|
lastCleanupTime uint64
|
||||||
mLen uint64
|
|
||||||
|
m sync.Map
|
||||||
|
|
||||||
matchFunc func(s string) bool
|
matchFunc func(s string) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fsmEntry struct {
|
||||||
|
lastAccessTime uint64
|
||||||
|
ok bool
|
||||||
|
}
|
||||||
|
|
||||||
// NewFastStringMatcher creates new matcher, which applies matchFunc to strings passed to Match()
|
// NewFastStringMatcher creates new matcher, which applies matchFunc to strings passed to Match()
|
||||||
//
|
//
|
||||||
// matchFunc must return the same result for the same input.
|
// matchFunc must return the same result for the same input.
|
||||||
func NewFastStringMatcher(matchFunc func(s string) bool) *FastStringMatcher {
|
func NewFastStringMatcher(matchFunc func(s string) bool) *FastStringMatcher {
|
||||||
var fsm FastStringMatcher
|
return &FastStringMatcher{
|
||||||
fsm.m.Store(&sync.Map{})
|
lastCleanupTime: fasttime.UnixTimestamp(),
|
||||||
fsm.matchFunc = matchFunc
|
matchFunc: matchFunc,
|
||||||
return &fsm
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Match applies matchFunc to s and returns the result.
|
// Match applies matchFunc to s and returns the result.
|
||||||
func (fsm *FastStringMatcher) Match(s string) bool {
|
func (fsm *FastStringMatcher) Match(s string) bool {
|
||||||
m := fsm.m.Load().(*sync.Map)
|
ct := fasttime.UnixTimestamp()
|
||||||
v, ok := m.Load(s)
|
v, ok := fsm.m.Load(s)
|
||||||
if ok {
|
if ok {
|
||||||
// Fast path - s match result is found in the cache.
|
// Fast path - s match result is found in the cache.
|
||||||
bp := v.(*bool)
|
e := v.(*fsmEntry)
|
||||||
return *bp
|
if atomic.LoadUint64(&e.lastAccessTime)+10 < ct {
|
||||||
|
// Reduce the frequency of e.lastAccessTime update to once per 10 seconds
|
||||||
|
// in order to improve the fast path speed on systems with many CPU cores.
|
||||||
|
atomic.StoreUint64(&e.lastAccessTime, ct)
|
||||||
|
}
|
||||||
|
return e.ok
|
||||||
}
|
}
|
||||||
// Slow path - run matchFunc for s and store the result in the cache.
|
// Slow path - run matchFunc for s and store the result in the cache.
|
||||||
b := fsm.matchFunc(s)
|
b := fsm.matchFunc(s)
|
||||||
bp := &b
|
e := &fsmEntry{
|
||||||
|
lastAccessTime: ct,
|
||||||
|
ok: b,
|
||||||
|
}
|
||||||
// Make a copy of s in order to limit memory usage to the s length,
|
// Make a copy of s in order to limit memory usage to the s length,
|
||||||
// since the s may point to bigger string.
|
// since the s may point to bigger string.
|
||||||
// This also protects from the case when s contains unsafe string, which points to a temporary byte slice.
|
// This also protects from the case when s contains unsafe string, which points to a temporary byte slice.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3227
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3227
|
||||||
s = strings.Clone(s)
|
s = strings.Clone(s)
|
||||||
m.Store(s, bp)
|
fsm.m.Store(s, e)
|
||||||
n := atomic.AddUint64(&fsm.mLen, 1)
|
|
||||||
if n > 100e3 {
|
if atomic.LoadUint64(&fsm.lastCleanupTime)+61 < ct {
|
||||||
atomic.StoreUint64(&fsm.mLen, 0)
|
// Perform a global cleanup for fsm.m by removing items, which weren't accessed
|
||||||
fsm.m.Store(&sync.Map{})
|
// during the last 5 minutes.
|
||||||
|
atomic.StoreUint64(&fsm.lastCleanupTime, ct)
|
||||||
|
m := &fsm.m
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
e := v.(*fsmEntry)
|
||||||
|
if atomic.LoadUint64(&e.lastAccessTime)+5*60 < ct {
|
||||||
|
m.Delete(k)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FastStringTransformer implements fast transformer for strings.
|
// FastStringTransformer implements fast transformer for strings.
|
||||||
|
@ -11,30 +13,41 @@ import (
|
||||||
// It caches transformed strings and returns them back on the next calls
|
// It caches transformed strings and returns them back on the next calls
|
||||||
// without calling the transformFunc, which may be expensive.
|
// without calling the transformFunc, which may be expensive.
|
||||||
type FastStringTransformer struct {
|
type FastStringTransformer struct {
|
||||||
m atomic.Value
|
lastCleanupTime uint64
|
||||||
mLen uint64
|
|
||||||
|
m sync.Map
|
||||||
|
|
||||||
transformFunc func(s string) string
|
transformFunc func(s string) string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fstEntry struct {
|
||||||
|
lastAccessTime uint64
|
||||||
|
s string
|
||||||
|
}
|
||||||
|
|
||||||
// NewFastStringTransformer creates new transformer, which applies transformFunc to strings passed to Transform()
|
// NewFastStringTransformer creates new transformer, which applies transformFunc to strings passed to Transform()
|
||||||
//
|
//
|
||||||
// transformFunc must return the same result for the same input.
|
// transformFunc must return the same result for the same input.
|
||||||
func NewFastStringTransformer(transformFunc func(s string) string) *FastStringTransformer {
|
func NewFastStringTransformer(transformFunc func(s string) string) *FastStringTransformer {
|
||||||
var fst FastStringTransformer
|
return &FastStringTransformer{
|
||||||
fst.m.Store(&sync.Map{})
|
lastCleanupTime: fasttime.UnixTimestamp(),
|
||||||
fst.transformFunc = transformFunc
|
transformFunc: transformFunc,
|
||||||
return &fst
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transform applies transformFunc to s and returns the result.
|
// Transform applies transformFunc to s and returns the result.
|
||||||
func (fst *FastStringTransformer) Transform(s string) string {
|
func (fst *FastStringTransformer) Transform(s string) string {
|
||||||
m := fst.m.Load().(*sync.Map)
|
ct := fasttime.UnixTimestamp()
|
||||||
v, ok := m.Load(s)
|
v, ok := fst.m.Load(s)
|
||||||
if ok {
|
if ok {
|
||||||
// Fast path - the transformed s is found in the cache.
|
// Fast path - the transformed s is found in the cache.
|
||||||
sp := v.(*string)
|
e := v.(*fstEntry)
|
||||||
return *sp
|
if atomic.LoadUint64(&e.lastAccessTime)+10 < ct {
|
||||||
|
// Reduce the frequency of e.lastAccessTime update to once per 10 seconds
|
||||||
|
// in order to improve the fast path speed on systems with many CPU cores.
|
||||||
|
atomic.StoreUint64(&e.lastAccessTime, ct)
|
||||||
|
}
|
||||||
|
return e.s
|
||||||
}
|
}
|
||||||
// Slow path - transform s and store it in the cache.
|
// Slow path - transform s and store it in the cache.
|
||||||
sTransformed := fst.transformFunc(s)
|
sTransformed := fst.transformFunc(s)
|
||||||
|
@ -48,12 +61,25 @@ func (fst *FastStringTransformer) Transform(s string) string {
|
||||||
// which, in turn, can point to bigger string.
|
// which, in turn, can point to bigger string.
|
||||||
sTransformed = s
|
sTransformed = s
|
||||||
}
|
}
|
||||||
sp := &sTransformed
|
e := &fstEntry{
|
||||||
m.Store(s, sp)
|
lastAccessTime: ct,
|
||||||
n := atomic.AddUint64(&fst.mLen, 1)
|
s: sTransformed,
|
||||||
if n > 100e3 {
|
|
||||||
atomic.StoreUint64(&fst.mLen, 0)
|
|
||||||
fst.m.Store(&sync.Map{})
|
|
||||||
}
|
}
|
||||||
|
fst.m.Store(s, e)
|
||||||
|
|
||||||
|
if atomic.LoadUint64(&fst.lastCleanupTime)+61 < ct {
|
||||||
|
// Perform a global cleanup for fst.m by removing items, which weren't accessed
|
||||||
|
// during the last 5 minutes.
|
||||||
|
atomic.StoreUint64(&fst.lastCleanupTime, ct)
|
||||||
|
m := &fst.m
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
e := v.(*fstEntry)
|
||||||
|
if atomic.LoadUint64(&e.lastAccessTime)+5*60 < ct {
|
||||||
|
m.Delete(k)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return sTransformed
|
return sTransformed
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,35 +1,58 @@
|
||||||
package bytesutil
|
package bytesutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// InternString returns interned s.
|
// InternString returns interned s.
|
||||||
//
|
//
|
||||||
// This may be needed for reducing the amounts of allocated memory.
|
// This may be needed for reducing the amounts of allocated memory.
|
||||||
func InternString(s string) string {
|
func InternString(s string) string {
|
||||||
m := internStringsMap.Load().(*sync.Map)
|
ct := fasttime.UnixTimestamp()
|
||||||
if v, ok := m.Load(s); ok {
|
if v, ok := internStringsMap.Load(s); ok {
|
||||||
sp := v.(*string)
|
e := v.(*ismEntry)
|
||||||
return *sp
|
if atomic.LoadUint64(&e.lastAccessTime)+10 < ct {
|
||||||
|
// Reduce the frequency of e.lastAccessTime update to once per 10 seconds
|
||||||
|
// in order to improve the fast path speed on systems with many CPU cores.
|
||||||
|
atomic.StoreUint64(&e.lastAccessTime, ct)
|
||||||
|
}
|
||||||
|
return e.s
|
||||||
}
|
}
|
||||||
// Make a new copy for s in order to remove references from possible bigger string s refers to.
|
// Make a new copy for s in order to remove references from possible bigger string s refers to.
|
||||||
sCopy := string(append([]byte{}, s...))
|
sCopy := strings.Clone(s)
|
||||||
m.Store(sCopy, &sCopy)
|
e := &ismEntry{
|
||||||
n := atomic.AddUint64(&internStringsMapLen, 1)
|
lastAccessTime: ct,
|
||||||
if n > 100e3 {
|
s: sCopy,
|
||||||
atomic.StoreUint64(&internStringsMapLen, 0)
|
|
||||||
internStringsMap.Store(&sync.Map{})
|
|
||||||
}
|
}
|
||||||
|
internStringsMap.Store(sCopy, e)
|
||||||
|
|
||||||
|
if atomic.LoadUint64(&internStringsMapLastCleanupTime)+61 < ct {
|
||||||
|
// Perform a global cleanup for internStringsMap by removing items, which weren't accessed
|
||||||
|
// during the last 5 minutes.
|
||||||
|
atomic.StoreUint64(&internStringsMapLastCleanupTime, ct)
|
||||||
|
m := &internStringsMap
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
e := v.(*ismEntry)
|
||||||
|
if atomic.LoadUint64(&e.lastAccessTime)+5*60 < ct {
|
||||||
|
m.Delete(k)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return sCopy
|
return sCopy
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
type ismEntry struct {
|
||||||
internStringsMap atomic.Value
|
lastAccessTime uint64
|
||||||
internStringsMapLen uint64
|
s string
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
internStringsMap.Store(&sync.Map{})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
internStringsMap sync.Map
|
||||||
|
internStringsMapLastCleanupTime uint64
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue