VictoriaMetrics/app/vmselect/promql/rollup_result_cache.go

650 lines
21 KiB
Go

package promql
import (
"crypto/rand"
"flag"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var (
cacheTimestampOffset = flag.Duration("search.cacheTimestampOffset", 5*time.Minute, "The maximum duration since the current time for response data, "+
"which is always queried from the original raw data, without using the response cache. Increase this value if you see gaps in responses "+
"due to time synchronization issues between VictoriaMetrics and data sources")
)
var rollupResultCacheV = &rollupResultCache{
c: workingsetcache.New(1024 * 1024), // This is a cache for testing.
}
var rollupResultCachePath string
func getRollupResultCacheSize() int {
rollupResultCacheSizeOnce.Do(func() {
n := memory.Allowed() / 8
if n <= 0 {
n = 1024 * 1024
}
rollupResultCacheSize = n
})
return rollupResultCacheSize
}
var (
rollupResultCacheSize int
rollupResultCacheSizeOnce sync.Once
)
// InitRollupResultCache initializes the rollupResult cache
//
// if cachePath is empty, then the cache isn't stored to persistent disk.
//
// ResetRollupResultCache must be called when the cache must be reset.
// StopRollupResultCache must be called when the cache isn't needed anymore.
func InitRollupResultCache(cachePath string) {
rollupResultCachePath = cachePath
startTime := time.Now()
cacheSize := getRollupResultCacheSize()
var c *workingsetcache.Cache
if len(rollupResultCachePath) > 0 {
logger.Infof("loading rollupResult cache from %q...", rollupResultCachePath)
c = workingsetcache.Load(rollupResultCachePath, cacheSize)
mustLoadRollupResultCacheKeyPrefix(rollupResultCachePath)
} else {
c = workingsetcache.New(cacheSize)
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
}
if *disableCache {
c.Reset()
}
stats := &fastcache.Stats{}
var statsLock sync.Mutex
var statsLastUpdate uint64
fcs := func() *fastcache.Stats {
statsLock.Lock()
defer statsLock.Unlock()
if fasttime.UnixTimestamp()-statsLastUpdate < 2 {
return stats
}
var fcs fastcache.Stats
c.UpdateStats(&fcs)
stats = &fcs
statsLastUpdate = fasttime.UnixTimestamp()
return stats
}
if len(rollupResultCachePath) > 0 {
logger.Infof("loaded rollupResult cache from %q in %.3f seconds; entriesCount: %d, sizeBytes: %d",
rollupResultCachePath, time.Since(startTime).Seconds(), fcs().EntriesCount, fcs().BytesSize)
}
// Use metrics.GetOrCreateGauge instead of metrics.NewGauge,
// so InitRollupResultCache+StopRollupResultCache could be called multiple times in tests.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2406
metrics.GetOrCreateGauge(`vm_cache_entries{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().EntriesCount)
})
metrics.GetOrCreateGauge(`vm_cache_size_bytes{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().BytesSize)
})
metrics.GetOrCreateGauge(`vm_cache_size_max_bytes{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().MaxBytesSize)
})
metrics.GetOrCreateGauge(`vm_cache_requests_total{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().GetCalls)
})
metrics.GetOrCreateGauge(`vm_cache_misses_total{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().Misses)
})
rollupResultCacheV = &rollupResultCache{
c: c,
}
}
// StopRollupResultCache closes the rollupResult cache.
func StopRollupResultCache() {
if len(rollupResultCachePath) == 0 {
rollupResultCacheV.c.Stop()
rollupResultCacheV.c = nil
return
}
logger.Infof("saving rollupResult cache to %q...", rollupResultCachePath)
startTime := time.Now()
if err := rollupResultCacheV.c.Save(rollupResultCachePath); err != nil {
logger.Errorf("cannot save rollupResult cache at %q: %s", rollupResultCachePath, err)
return
}
mustSaveRollupResultCacheKeyPrefix(rollupResultCachePath)
var fcs fastcache.Stats
rollupResultCacheV.c.UpdateStats(&fcs)
rollupResultCacheV.c.Stop()
rollupResultCacheV.c = nil
logger.Infof("saved rollupResult cache to %q in %.3f seconds; entriesCount: %d, sizeBytes: %d",
rollupResultCachePath, time.Since(startTime).Seconds(), fcs.EntriesCount, fcs.BytesSize)
}
// TODO: convert this cache to distributed cache shared among vmselect
// instances in the cluster.
type rollupResultCache struct {
c *workingsetcache.Cache
}
var rollupResultCacheResets = metrics.NewCounter(`vm_cache_resets_total{type="promql/rollupResult"}`)
// ResetRollupResultCache resets rollup result cache.
func ResetRollupResultCache() {
rollupResultCacheResets.Inc()
atomic.AddUint64(&rollupResultCacheKeyPrefix, 1)
logger.Infof("rollupResult cache has been cleared")
}
func (rrc *rollupResultCache) Get(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache get: query=%s, timeRange=%s, step=%d, window=%d", query, ec.timeRangeString(), ec.Step, window)
defer qt.Done()
}
if !ec.mayCache() {
qt.Printf("do not fetch series from cache, since it is disabled in the current context")
return nil, ec.Start
}
// Obtain tss from the cache.
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
metainfoBuf := rrc.c.Get(nil, bb.B)
if len(metainfoBuf) == 0 {
qt.Printf("nothing found")
return nil, ec.Start
}
var mi rollupResultCacheMetainfo
if err := mi.Unmarshal(metainfoBuf); err != nil {
logger.Panicf("BUG: cannot unmarshal rollupResultCacheMetainfo: %s; it looks like it was improperly saved", err)
}
key := mi.GetBestKey(ec.Start, ec.End)
if key.prefix == 0 && key.suffix == 0 {
qt.Printf("nothing found on the timeRange")
return nil, ec.Start
}
bb.B = key.Marshal(bb.B[:0])
compressedResultBuf := resultBufPool.Get()
defer resultBufPool.Put(compressedResultBuf)
compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], bb.B)
if len(compressedResultBuf.B) == 0 {
mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKey(bb.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
rrc.c.Set(bb.B, metainfoBuf)
qt.Printf("missing cache entry")
return nil, ec.Start
}
// Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast
// refers to the byte slice, so it cannot be returned to the resultBufPool.
qt.Printf("load compressed entry from cache with size %d bytes", len(compressedResultBuf.B))
resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B)
if err != nil {
logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err)
}
qt.Printf("unpack the entry into %d bytes", len(resultBuf))
tss, err = unmarshalTimeseriesFast(resultBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err)
}
qt.Printf("unmarshal %d series", len(tss))
// Extract values for the matching timestamps
timestamps := tss[0].Timestamps
i := 0
for i < len(timestamps) && timestamps[i] < ec.Start {
i++
}
if i == len(timestamps) {
// no matches.
qt.Printf("no datapoints found in the cached series on the given timeRange")
return nil, ec.Start
}
if timestamps[i] != ec.Start {
// The cached range doesn't cover the requested range.
qt.Printf("cached series don't cover the given timeRange")
return nil, ec.Start
}
j := len(timestamps) - 1
for j >= 0 && timestamps[j] > ec.End {
j--
}
j++
if j <= i {
// no matches.
return nil, ec.Start
}
for _, ts := range tss {
ts.Timestamps = ts.Timestamps[i:j]
ts.Values = ts.Values[i:j]
}
timestamps = tss[0].Timestamps
newStart = timestamps[len(timestamps)-1] + ec.Step
if qt.Enabled() {
startString := storage.TimestampToHumanReadableFormat(ec.Start)
endString := storage.TimestampToHumanReadableFormat(newStart - ec.Step)
qt.Printf("return %d series on a timeRange=[%s..%s]", len(tss), startString, endString)
}
return tss, newStart
}
var resultBufPool bytesutil.ByteBufferPool
func (rrc *rollupResultCache) Put(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = bytesutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache put: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss))
defer qt.Done()
}
if len(tss) == 0 || !ec.mayCache() {
qt.Printf("do not store series to cache, since it is disabled in the current context")
return
}
// Remove values up to currentTime - step - cacheTimestampOffset,
// since these values may be added later.
timestamps := tss[0].Timestamps
deadline := (time.Now().UnixNano() / 1e6) - ec.Step - cacheTimestampOffset.Milliseconds()
i := len(timestamps) - 1
for i >= 0 && timestamps[i] > deadline {
i--
}
i++
if i == 0 {
// Nothing to store in the cache.
qt.Printf("nothing to store in the cache, since all the points have timestamps bigger than %d", deadline)
return
}
if i < len(timestamps) {
timestamps = timestamps[:i]
// Make a copy of tss and remove unfit values
rvs := copyTimeseriesShallow(tss)
for _, ts := range rvs {
ts.Timestamps = ts.Timestamps[:i]
ts.Values = ts.Values[:i]
}
tss = rvs
}
// Store tss in the cache.
metainfoKey := bbPool.Get()
defer bbPool.Put(metainfoKey)
metainfoBuf := bbPool.Get()
defer bbPool.Put(metainfoBuf)
metainfoKey.B = marshalRollupResultCacheKey(metainfoKey.B[:0], ec.AuthToken, expr, window, ec.Step, ec.EnforcedTagFilterss)
metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B)
var mi rollupResultCacheMetainfo
if len(metainfoBuf.B) > 0 {
if err := mi.Unmarshal(metainfoBuf.B); err != nil {
logger.Panicf("BUG: cannot unmarshal rollupResultCacheMetainfo: %s; it looks like it was improperly saved", err)
}
}
start := timestamps[0]
end := timestamps[len(timestamps)-1]
if mi.CoversTimeRange(start, end) {
if qt.Enabled() {
startString := storage.TimestampToHumanReadableFormat(start)
endString := storage.TimestampToHumanReadableFormat(end)
qt.Printf("series on the given timeRange=[%s..%s] already exist in the cache", startString, endString)
}
return
}
maxMarshaledSize := getRollupResultCacheSize() / 4
resultBuf := resultBufPool.Get()
defer resultBufPool.Put(resultBuf)
resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, ec.Step)
if len(resultBuf.B) == 0 {
tooBigRollupResults.Inc()
qt.Printf("cannot store series in the cache, since they would occupy more than %d bytes", maxMarshaledSize)
return
}
if qt.Enabled() {
startString := storage.TimestampToHumanReadableFormat(start)
endString := storage.TimestampToHumanReadableFormat(end)
qt.Printf("marshal %d series on a timeRange=[%s..%s] into %d bytes", len(tss), startString, endString, len(resultBuf.B))
}
compressedResultBuf := resultBufPool.Get()
defer resultBufPool.Put(compressedResultBuf)
compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1)
qt.Printf("compress %d bytes into %d bytes", len(resultBuf.B), len(compressedResultBuf.B))
var key rollupResultCacheKey
key.prefix = rollupResultCacheKeyPrefix
key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1)
rollupResultKey := key.Marshal(nil)
rrc.c.SetBig(rollupResultKey, compressedResultBuf.B)
qt.Printf("store %d bytes in the cache", len(compressedResultBuf.B))
mi.AddKey(key, timestamps[0], timestamps[len(timestamps)-1])
metainfoBuf.B = mi.Marshal(metainfoBuf.B[:0])
rrc.c.Set(metainfoKey.B, metainfoBuf.B)
}
var (
rollupResultCacheKeyPrefix uint64
rollupResultCacheKeySuffix = uint64(time.Now().UnixNano())
)
func newRollupResultCacheKeyPrefix() uint64 {
var buf [8]byte
if _, err := rand.Read(buf[:]); err != nil {
// do not use logger.Panicf, since it isn't initialized yet.
panic(fmt.Errorf("FATAL: cannot read random data for rollupResultCacheKeyPrefix: %w", err))
}
return encoding.UnmarshalUint64(buf[:])
}
func mustLoadRollupResultCacheKeyPrefix(path string) {
path = path + ".key.prefix"
if !fs.IsPathExist(path) {
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
return
}
data, err := os.ReadFile(path)
if err != nil {
logger.Errorf("cannot load %s: %s; reset rollupResult cache", path, err)
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
return
}
if len(data) != 8 {
logger.Errorf("unexpected size of %s; want 8 bytes; got %d bytes; reset rollupResult cache", path, len(data))
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
return
}
rollupResultCacheKeyPrefix = encoding.UnmarshalUint64(data)
}
func mustSaveRollupResultCacheKeyPrefix(path string) {
path = path + ".key.prefix"
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix)
if err := fs.WriteFileAtomically(path, data, true); err != nil {
logger.Fatalf("cannot store rollupResult cache key prefix to %q: %s", path, err)
}
}
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 9
func marshalRollupResultCacheKey(dst []byte, at *auth.Token, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix)
dst = encoding.MarshalUint32(dst, at.AccountID)
dst = encoding.MarshalUint32(dst, at.ProjectID)
dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step)
dst = expr.AppendString(dst)
for i, etf := range etfs {
for _, f := range etf {
dst = f.Marshal(dst)
}
if i+1 < len(etfs) {
dst = append(dst, '|')
}
}
return dst
}
// mergeTimeseries concatenates b with a and returns the result.
//
// Preconditions:
// - a mustn't intersect with b.
// - a timestamps must be smaller than b timestamps.
//
// Postconditions:
// - a and b cannot be used after returning from the call.
func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timeseries {
sharedTimestamps := ec.getSharedTimestamps()
if bStart == ec.Start {
// Nothing to merge - b covers all the time range.
// Verify b is correct.
for _, tsB := range b {
tsB.denyReuse = true
tsB.Timestamps = sharedTimestamps
if len(tsB.Values) != len(tsB.Timestamps) {
logger.Panicf("BUG: unexpected number of values in b; got %d; want %d", len(tsB.Values), len(tsB.Timestamps))
}
}
return b
}
m := make(map[string]*timeseries, len(a))
bb := bbPool.Get()
defer bbPool.Put(bb)
for _, ts := range a {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
m[k] = ts
}
rvs := make([]*timeseries, 0, len(a))
for _, tsB := range b {
var tmp timeseries
tmp.denyReuse = true
tmp.Timestamps = sharedTimestamps
tmp.Values = make([]float64, 0, len(tmp.Timestamps))
tmp.MetricName.MoveFrom(&tsB.MetricName)
bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName)
k := bytesutil.InternBytes(bb.B)
tsA := m[k]
if tsA == nil {
tStart := ec.Start
for tStart < bStart {
tmp.Values = append(tmp.Values, nan)
tStart += ec.Step
}
} else {
tmp.Values = append(tmp.Values, tsA.Values...)
delete(m, k)
}
tmp.Values = append(tmp.Values, tsB.Values...)
if len(tmp.Values) != len(tmp.Timestamps) {
logger.Panicf("BUG: unexpected values after merging new values; got %d; want %d", len(tmp.Values), len(tmp.Timestamps))
}
rvs = append(rvs, &tmp)
}
// Copy the remaining timeseries from m.
for _, tsA := range m {
var tmp timeseries
tmp.denyReuse = true
tmp.Timestamps = sharedTimestamps
tmp.Values = make([]float64, 0, len(tmp.Timestamps))
tmp.MetricName.MoveFrom(&tsA.MetricName)
tmp.Values = append(tmp.Values, tsA.Values...)
tStart := bStart
for tStart <= ec.End {
tmp.Values = append(tmp.Values, nan)
tStart += ec.Step
}
if len(tmp.Values) != len(tmp.Timestamps) {
logger.Panicf("BUG: unexpected values in the result after adding cached values; got %d; want %d", len(tmp.Values), len(tmp.Timestamps))
}
rvs = append(rvs, &tmp)
}
return rvs
}
type rollupResultCacheMetainfo struct {
entries []rollupResultCacheMetainfoEntry
}
func (mi *rollupResultCacheMetainfo) Marshal(dst []byte) []byte {
dst = encoding.MarshalUint32(dst, uint32(len(mi.entries)))
for i := range mi.entries {
dst = mi.entries[i].Marshal(dst)
}
return dst
}
func (mi *rollupResultCacheMetainfo) Unmarshal(src []byte) error {
if len(src) < 4 {
return fmt.Errorf("cannot unmarshal len(etries) from %d bytes; need at least %d bytes", len(src), 4)
}
entriesLen := int(encoding.UnmarshalUint32(src))
src = src[4:]
if n := entriesLen - cap(mi.entries); n > 0 {
mi.entries = append(mi.entries[:cap(mi.entries)], make([]rollupResultCacheMetainfoEntry, n)...)
}
mi.entries = mi.entries[:entriesLen]
for i := 0; i < entriesLen; i++ {
tail, err := mi.entries[i].Unmarshal(src)
if err != nil {
return fmt.Errorf("cannot unmarshal entry #%d: %w", i, err)
}
src = tail
}
if len(src) > 0 {
return fmt.Errorf("unexpected non-empty tail left; len(tail)=%d", len(src))
}
return nil
}
func (mi *rollupResultCacheMetainfo) CoversTimeRange(start, end int64) bool {
if start > end {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", start, end)
}
for i := range mi.entries {
e := &mi.entries[i]
if start >= e.start && end <= e.end {
return true
}
}
return false
}
func (mi *rollupResultCacheMetainfo) GetBestKey(start, end int64) rollupResultCacheKey {
if start > end {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", start, end)
}
var bestKey rollupResultCacheKey
dMax := int64(0)
for i := range mi.entries {
e := &mi.entries[i]
if start < e.start {
continue
}
d := e.end - start
if end <= e.end {
d = end - start
}
if d >= dMax {
dMax = d
bestKey = e.key
}
}
return bestKey
}
func (mi *rollupResultCacheMetainfo) AddKey(key rollupResultCacheKey, start, end int64) {
if start > end {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", start, end)
}
mi.entries = append(mi.entries, rollupResultCacheMetainfoEntry{
start: start,
end: end,
key: key,
})
if len(mi.entries) > 30 {
// Remove old entries.
mi.entries = append(mi.entries[:0], mi.entries[10:]...)
}
}
func (mi *rollupResultCacheMetainfo) RemoveKey(key rollupResultCacheKey) {
for i := range mi.entries {
if mi.entries[i].key == key {
mi.entries = append(mi.entries[:i], mi.entries[i+1:]...)
return
}
}
}
type rollupResultCacheMetainfoEntry struct {
start int64
end int64
key rollupResultCacheKey
}
func (mie *rollupResultCacheMetainfoEntry) Marshal(dst []byte) []byte {
dst = encoding.MarshalInt64(dst, mie.start)
dst = encoding.MarshalInt64(dst, mie.end)
dst = encoding.MarshalUint64(dst, mie.key.prefix)
dst = encoding.MarshalUint64(dst, mie.key.suffix)
return dst
}
func (mie *rollupResultCacheMetainfoEntry) Unmarshal(src []byte) ([]byte, error) {
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal start from %d bytes; need at least %d bytes", len(src), 8)
}
mie.start = encoding.UnmarshalInt64(src)
src = src[8:]
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal end from %d bytes; need at least %d bytes", len(src), 8)
}
mie.end = encoding.UnmarshalInt64(src)
src = src[8:]
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal key prefix from %d bytes; need at least %d bytes", len(src), 8)
}
mie.key.prefix = encoding.UnmarshalUint64(src)
src = src[8:]
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal key suffix from %d bytes; need at least %d bytes", len(src), 8)
}
mie.key.suffix = encoding.UnmarshalUint64(src)
src = src[8:]
return src, nil
}
// rollupResultCacheKey must be globally unique across vmselect nodes,
// so it has prefix and suffix.
type rollupResultCacheKey struct {
prefix uint64
suffix uint64
}
func (k *rollupResultCacheKey) Marshal(dst []byte) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, k.prefix)
dst = encoding.MarshalUint64(dst, k.suffix)
return dst
}