VictoriaMetrics/app/vmselect/promql/rollup_result_cache.go
Aliaksandr Valialkin 2cbdb1db22
app/vmselect/promql: properly handle duplicate series when merging cached results with the results obtained from the database
evalRollupFuncNoCache() may return time series with identical labels (aka duplicate series)
when performing queries satisfying all the following conditions:

- It must select time series with multiple metric names. For example, {__name__=~"foo|bar"}
- The series selector must be wrapped into rollup function, which drops metric names. For example, rate({__name__=~"foo|bar"})
- The rollup function must be wrapped into aggregate function, which has no streaming optimization.
  For example, quantile(0.9, rate({__name__=~"foo|bar"})

In this case VictoriaMetrics shouldn't return `cannot merge series: duplicate series found` error.
Instead, it should fall back to query execution with disabled cache.

Also properly store the merged results. Previously they were incorrectly stored because of a typo
introduced in the commit 41a0fdaf39

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5332
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5337
2023-11-16 16:01:40 +01:00

854 lines
28 KiB
Go

package promql
import (
"crypto/rand"
"flag"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var (
cacheTimestampOffset = flag.Duration("search.cacheTimestampOffset", 5*time.Minute, "The maximum duration since the current time for response data, "+
"which is always queried from the original raw data, without using the response cache. Increase this value if you see gaps in responses "+
"due to time synchronization issues between VictoriaMetrics and data sources. See also -search.disableAutoCacheReset")
disableAutoCacheReset = flag.Bool("search.disableAutoCacheReset", false, "Whether to disable automatic response cache reset if a sample with timestamp "+
"outside -search.cacheTimestampOffset is inserted into VictoriaMetrics")
)
// ResetRollupResultCacheIfNeeded resets rollup result cache if mrs contains timestamps outside `now - search.cacheTimestampOffset`.
func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) {
if *disableAutoCacheReset {
// Do not reset response cache if -search.disableAutoCacheReset is set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1570 .
return
}
checkRollupResultCacheResetOnce.Do(func() {
rollupResultResetMetricRowSample.Store(&storage.MetricRow{})
go checkRollupResultCacheReset()
})
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + checkRollupResultCacheResetInterval.Milliseconds()
needCacheReset := false
for i := range mrs {
if mrs[i].Timestamp < minTimestamp {
var mr storage.MetricRow
mr.CopyFrom(&mrs[i])
rollupResultResetMetricRowSample.Store(&mr)
needCacheReset = true
break
}
}
if needCacheReset {
// Do not call ResetRollupResultCache() here, since it may be heavy when frequently called.
atomic.StoreUint32(&needRollupResultCacheReset, 1)
}
}
func checkRollupResultCacheReset() {
for {
time.Sleep(checkRollupResultCacheResetInterval)
if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 {
mr := rollupResultResetMetricRowSample.Load()
d := int64(fasttime.UnixTimestamp()*1000) - mr.Timestamp - cacheTimestampOffset.Milliseconds()
logger.Warnf("resetting rollup result cache because the metric %s has a timestamp older than -search.cacheTimestampOffset=%s by %.3fs",
mr.String(), cacheTimestampOffset, float64(d)/1e3)
ResetRollupResultCache()
}
}
}
const checkRollupResultCacheResetInterval = 5 * time.Second
var needRollupResultCacheReset uint32
var checkRollupResultCacheResetOnce sync.Once
var rollupResultResetMetricRowSample atomic.Pointer[storage.MetricRow]
var rollupResultCacheV = &rollupResultCache{
c: workingsetcache.New(1024 * 1024), // This is a cache for testing.
}
var rollupResultCachePath string
func getRollupResultCacheSize() int {
rollupResultCacheSizeOnce.Do(func() {
n := memory.Allowed() / 16
if n <= 0 {
n = 1024 * 1024
}
rollupResultCacheSize = n
})
return rollupResultCacheSize
}
var (
rollupResultCacheSize int
rollupResultCacheSizeOnce sync.Once
)
// InitRollupResultCache initializes the rollupResult cache
//
// if cachePath is empty, then the cache isn't stored to persistent disk.
//
// ResetRollupResultCache must be called when the cache must be reset.
// StopRollupResultCache must be called when the cache isn't needed anymore.
func InitRollupResultCache(cachePath string) {
rollupResultCachePath = cachePath
startTime := time.Now()
cacheSize := getRollupResultCacheSize()
var c *workingsetcache.Cache
if len(rollupResultCachePath) > 0 {
logger.Infof("loading rollupResult cache from %q...", rollupResultCachePath)
c = workingsetcache.Load(rollupResultCachePath, cacheSize)
mustLoadRollupResultCacheKeyPrefix(rollupResultCachePath)
} else {
c = workingsetcache.New(cacheSize)
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
}
if *disableCache {
c.Reset()
}
stats := &fastcache.Stats{}
var statsLock sync.Mutex
var statsLastUpdate uint64
fcs := func() *fastcache.Stats {
statsLock.Lock()
defer statsLock.Unlock()
if fasttime.UnixTimestamp()-statsLastUpdate < 2 {
return stats
}
var fcs fastcache.Stats
c.UpdateStats(&fcs)
stats = &fcs
statsLastUpdate = fasttime.UnixTimestamp()
return stats
}
if len(rollupResultCachePath) > 0 {
logger.Infof("loaded rollupResult cache from %q in %.3f seconds; entriesCount: %d, sizeBytes: %d",
rollupResultCachePath, time.Since(startTime).Seconds(), fcs().EntriesCount, fcs().BytesSize)
}
// Use metrics.GetOrCreateGauge instead of metrics.NewGauge,
// so InitRollupResultCache+StopRollupResultCache could be called multiple times in tests.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2406
metrics.GetOrCreateGauge(`vm_cache_entries{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().EntriesCount)
})
metrics.GetOrCreateGauge(`vm_cache_size_bytes{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().BytesSize)
})
metrics.GetOrCreateGauge(`vm_cache_size_max_bytes{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().MaxBytesSize)
})
metrics.GetOrCreateGauge(`vm_cache_requests_total{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().GetCalls)
})
metrics.GetOrCreateGauge(`vm_cache_misses_total{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().Misses)
})
rollupResultCacheV = &rollupResultCache{
c: c,
}
}
// StopRollupResultCache closes the rollupResult cache.
func StopRollupResultCache() {
if len(rollupResultCachePath) == 0 {
rollupResultCacheV.c.Stop()
rollupResultCacheV.c = nil
return
}
logger.Infof("saving rollupResult cache to %q...", rollupResultCachePath)
startTime := time.Now()
if err := rollupResultCacheV.c.Save(rollupResultCachePath); err != nil {
logger.Errorf("cannot save rollupResult cache at %q: %s", rollupResultCachePath, err)
return
}
mustSaveRollupResultCacheKeyPrefix(rollupResultCachePath)
var fcs fastcache.Stats
rollupResultCacheV.c.UpdateStats(&fcs)
rollupResultCacheV.c.Stop()
rollupResultCacheV.c = nil
logger.Infof("saved rollupResult cache to %q in %.3f seconds; entriesCount: %d, sizeBytes: %d",
rollupResultCachePath, time.Since(startTime).Seconds(), fcs.EntriesCount, fcs.BytesSize)
}
type rollupResultCache struct {
c *workingsetcache.Cache
}
var rollupResultCacheResets = metrics.NewCounter(`vm_cache_resets_total{type="promql/rollupResult"}`)
// ResetRollupResultCache resets rollup result cache.
func ResetRollupResultCache() {
rollupResultCacheResets.Inc()
atomic.AddUint64(&rollupResultCacheKeyPrefix, 1)
logger.Infof("rollupResult cache has been cleared")
}
func (rrc *rollupResultCache) GetInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter) []*timeseries {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = stringsutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache get instant values: query=%s, window=%d, step=%d", query, window, step)
defer qt.Done()
}
// Obtain instant values from the cache
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss)
tss, ok := rrc.getSeriesFromCache(qt, bb.B)
if !ok || len(tss) == 0 {
return nil
}
assertInstantValues(tss)
qt.Printf("found %d series for time=%s", len(tss), storage.TimestampToHumanReadableFormat(tss[0].Timestamps[0]))
return tss
}
func (rrc *rollupResultCache) PutInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter, tss []*timeseries) {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = stringsutil.LimitStringLen(query, 300)
startStr := ""
if len(tss) > 0 {
startStr = storage.TimestampToHumanReadableFormat(tss[0].Timestamps[0])
}
qt = qt.NewChild("rollup cache put instant values: query=%s, window=%d, step=%d, series=%d, time=%s", query, window, step, len(tss), startStr)
defer qt.Done()
}
if len(tss) == 0 {
qt.Printf("do not cache empty series list")
return
}
assertInstantValues(tss)
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss)
_ = rrc.putSeriesToCache(qt, bb.B, step, tss)
}
func (rrc *rollupResultCache) DeleteInstantValues(qt *querytracer.Tracer, expr metricsql.Expr, window, step int64, etfss [][]storage.TagFilter) {
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKeyForInstantValues(bb.B[:0], expr, window, step, etfss)
if !rrc.putSeriesToCache(qt, bb.B, step, nil) {
logger.Panicf("BUG: cannot store zero series to cache")
}
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = stringsutil.LimitStringLen(query, 300)
qt.Printf("rollup result cache delete instant values: query=%s, window=%d, step=%d", query, window, step)
}
}
func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64) (tss []*timeseries, newStart int64) {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = stringsutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache get series: query=%s, timeRange=%s, window=%d, step=%d", query, ec.timeRangeString(), window, ec.Step)
defer qt.Done()
}
// Obtain tss from the cache.
bb := bbPool.Get()
defer bbPool.Put(bb)
bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss)
metainfoBuf := rrc.c.Get(nil, bb.B)
if len(metainfoBuf) == 0 {
qt.Printf("nothing found")
return nil, ec.Start
}
var mi rollupResultCacheMetainfo
if err := mi.Unmarshal(metainfoBuf); err != nil {
logger.Panicf("BUG: cannot unmarshal rollupResultCacheMetainfo: %s; it looks like it was improperly saved", err)
}
key := mi.GetBestKey(ec.Start, ec.End)
if key.prefix == 0 && key.suffix == 0 {
qt.Printf("nothing found on the timeRange")
return nil, ec.Start
}
var ok bool
bb.B = key.Marshal(bb.B[:0])
tss, ok = rrc.getSeriesFromCache(qt, bb.B)
if !ok {
mi.RemoveKey(key)
metainfoBuf = mi.Marshal(metainfoBuf[:0])
bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss)
rrc.c.Set(bb.B, metainfoBuf)
return nil, ec.Start
}
// Extract values for the matching timestamps
timestamps := tss[0].Timestamps
i := 0
for i < len(timestamps) && timestamps[i] < ec.Start {
i++
}
if i == len(timestamps) {
qt.Printf("no datapoints found in the cached series on the given timeRange")
return nil, ec.Start
}
if timestamps[i] != ec.Start {
qt.Printf("cached series don't cover the given timeRange")
return nil, ec.Start
}
j := len(timestamps) - 1
for j >= 0 && timestamps[j] > ec.End {
j--
}
j++
if j <= i {
qt.Printf("no matching samples for the given timeRange")
return nil, ec.Start
}
for _, ts := range tss {
ts.Timestamps = ts.Timestamps[i:j]
ts.Values = ts.Values[i:j]
}
timestamps = tss[0].Timestamps
newStart = timestamps[len(timestamps)-1] + ec.Step
if qt.Enabled() {
startString := storage.TimestampToHumanReadableFormat(ec.Start)
endString := storage.TimestampToHumanReadableFormat(newStart - ec.Step)
qt.Printf("return %d series on a timeRange=[%s..%s]", len(tss), startString, endString)
}
return tss, newStart
}
var resultBufPool bytesutil.ByteBufferPool
func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, expr metricsql.Expr, window int64, tss []*timeseries) {
if qt.Enabled() {
query := string(expr.AppendString(nil))
query = stringsutil.LimitStringLen(query, 300)
qt = qt.NewChild("rollup cache put series: query=%s, timeRange=%s, step=%d, window=%d, series=%d", query, ec.timeRangeString(), ec.Step, window, len(tss))
defer qt.Done()
}
if len(tss) == 0 {
qt.Printf("do not cache empty series list")
return
}
if len(tss) > 1 {
// Verify whether tss contains series with duplicate naming.
// There is little sense in storing such series in the cache, since they cannot be merged in mergeSeries() later.
bb := bbPool.Get()
m := make(map[string]struct{}, len(tss))
for _, ts := range tss {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
qt.Printf("do not cache series with duplicate naming %s", &ts.MetricName)
return
}
m[string(bb.B)] = struct{}{}
}
bbPool.Put(bb)
}
// Remove values up to currentTime - step - cacheTimestampOffset,
// since these values may be added later.
timestamps := tss[0].Timestamps
deadline := (time.Now().UnixNano() / 1e6) - ec.Step - cacheTimestampOffset.Milliseconds()
i := len(timestamps) - 1
for i >= 0 && timestamps[i] > deadline {
i--
}
i++
if i == 0 {
// Nothing to store in the cache.
qt.Printf("nothing to store in the cache, since all the points have timestamps bigger than %d", deadline)
return
}
if i < len(timestamps) {
timestamps = timestamps[:i]
// Make a copy of tss and remove unfit values
rvs := copyTimeseriesShallow(tss)
for _, ts := range rvs {
ts.Timestamps = ts.Timestamps[:i]
ts.Values = ts.Values[:i]
}
tss = rvs
}
// Store tss in the cache.
metainfoKey := bbPool.Get()
defer bbPool.Put(metainfoKey)
metainfoBuf := bbPool.Get()
defer bbPool.Put(metainfoBuf)
metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], expr, window, ec.Step, ec.EnforcedTagFilterss)
metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B)
var mi rollupResultCacheMetainfo
if len(metainfoBuf.B) > 0 {
if err := mi.Unmarshal(metainfoBuf.B); err != nil {
logger.Panicf("BUG: cannot unmarshal rollupResultCacheMetainfo: %s; it looks like it was improperly saved", err)
}
}
start := timestamps[0]
end := timestamps[len(timestamps)-1]
if mi.CoversTimeRange(start, end) {
if qt.Enabled() {
startString := storage.TimestampToHumanReadableFormat(start)
endString := storage.TimestampToHumanReadableFormat(end)
qt.Printf("series on the given timeRange=[%s..%s] already exist in the cache", startString, endString)
}
return
}
var key rollupResultCacheKey
key.prefix = rollupResultCacheKeyPrefix
key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1)
bb := bbPool.Get()
bb.B = key.Marshal(bb.B[:0])
ok := rrc.putSeriesToCache(qt, bb.B, ec.Step, tss)
bbPool.Put(bb)
if !ok {
return
}
mi.AddKey(key, timestamps[0], timestamps[len(timestamps)-1])
metainfoBuf.B = mi.Marshal(metainfoBuf.B[:0])
rrc.c.Set(metainfoKey.B, metainfoBuf.B)
}
var (
rollupResultCacheKeyPrefix uint64
rollupResultCacheKeySuffix = uint64(time.Now().UnixNano())
)
func (rrc *rollupResultCache) getSeriesFromCache(qt *querytracer.Tracer, key []byte) ([]*timeseries, bool) {
compressedResultBuf := resultBufPool.Get()
compressedResultBuf.B = rrc.c.GetBig(compressedResultBuf.B[:0], key)
if len(compressedResultBuf.B) == 0 {
qt.Printf("nothing found in the cache")
resultBufPool.Put(compressedResultBuf)
return nil, false
}
qt.Printf("load compressed entry from cache with size %d bytes", len(compressedResultBuf.B))
// Decompress into newly allocated byte slice, since tss returned from unmarshalTimeseriesFast
// refers to the byte slice, so it cannot be re-used.
resultBuf, err := encoding.DecompressZSTD(nil, compressedResultBuf.B)
if err != nil {
logger.Panicf("BUG: cannot decompress resultBuf from rollupResultCache: %s; it looks like it was improperly saved", err)
}
resultBufPool.Put(compressedResultBuf)
qt.Printf("unpack the entry into %d bytes", len(resultBuf))
tss, err := unmarshalTimeseriesFast(resultBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal timeseries from rollupResultCache: %s; it looks like it was improperly saved", err)
}
qt.Printf("unmarshal %d series", len(tss))
return tss, true
}
func (rrc *rollupResultCache) putSeriesToCache(qt *querytracer.Tracer, key []byte, step int64, tss []*timeseries) bool {
maxMarshaledSize := getRollupResultCacheSize() / 4
resultBuf := resultBufPool.Get()
defer resultBufPool.Put(resultBuf)
resultBuf.B = marshalTimeseriesFast(resultBuf.B[:0], tss, maxMarshaledSize, step)
if len(resultBuf.B) == 0 {
tooBigRollupResults.Inc()
qt.Printf("cannot store %d series in the cache, since they would occupy more than %d bytes", len(tss), maxMarshaledSize)
return false
}
qt.Printf("marshal %d series into %d bytes", len(tss), len(resultBuf.B))
compressedResultBuf := resultBufPool.Get()
defer resultBufPool.Put(compressedResultBuf)
compressedResultBuf.B = encoding.CompressZSTDLevel(compressedResultBuf.B[:0], resultBuf.B, 1)
qt.Printf("compress %d bytes into %d bytes", len(resultBuf.B), len(compressedResultBuf.B))
rrc.c.SetBig(key, compressedResultBuf.B)
qt.Printf("store %d bytes in the cache", len(compressedResultBuf.B))
return true
}
func newRollupResultCacheKeyPrefix() uint64 {
var buf [8]byte
if _, err := rand.Read(buf[:]); err != nil {
// do not use logger.Panicf, since it isn't initialized yet.
panic(fmt.Errorf("FATAL: cannot read random data for rollupResultCacheKeyPrefix: %w", err))
}
return encoding.UnmarshalUint64(buf[:])
}
func mustLoadRollupResultCacheKeyPrefix(path string) {
path = path + ".key.prefix"
if !fs.IsPathExist(path) {
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
return
}
data, err := os.ReadFile(path)
if err != nil {
logger.Errorf("cannot load %s: %s; reset rollupResult cache", path, err)
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
return
}
if len(data) != 8 {
logger.Errorf("unexpected size of %s; want 8 bytes; got %d bytes; reset rollupResult cache", path, len(data))
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
return
}
rollupResultCacheKeyPrefix = encoding.UnmarshalUint64(data)
}
func mustSaveRollupResultCacheKeyPrefix(path string) {
path = path + ".key.prefix"
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix)
fs.MustWriteAtomic(path, data, true)
}
var tooBigRollupResults = metrics.NewCounter("vm_too_big_rollup_results_total")
// Increment this value every time the format of the cache changes.
const rollupResultCacheVersion = 11
const (
rollupResultCacheTypeSeries = 0
rollupResultCacheTypeInstantValues = 1
)
func marshalRollupResultCacheKeyForSeries(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix)
dst = append(dst, rollupResultCacheTypeSeries)
dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step)
dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs)
dst = expr.AppendString(dst)
return dst
}
func marshalRollupResultCacheKeyForInstantValues(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix)
dst = append(dst, rollupResultCacheTypeInstantValues)
dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step)
dst = marshalTagFiltersForRollupResultCacheKey(dst, etfs)
dst = expr.AppendString(dst)
return dst
}
func marshalTagFiltersForRollupResultCacheKey(dst []byte, etfs [][]storage.TagFilter) []byte {
for i, etf := range etfs {
for _, f := range etf {
dst = f.Marshal(dst)
}
if i+1 < len(etfs) {
dst = append(dst, '|')
}
}
return dst
}
func equalTimestamps(a, b []int64) bool {
if len(a) != len(b) {
return false
}
for i, tsA := range a {
tsB := b[i]
if tsA != tsB {
return false
}
}
return true
}
// mergeSeries concatenates a with b and returns the result.
//
// true is returned on successful concatenation, false otherwise.
//
// Preconditions:
// - bStart must be in the range [ec.Start .. ec.End]
// - a must contain series with all the samples on the range [ec.Start ... bStart - ec.Step] with ec.Step interval between them
// - b must contain series with all the samples on the range [bStart .. ec.End] with ec.Step interval between them
//
// Postconditions:
// - the returned series contain all the samples on the range [ec.Start .. ec.End] with ec.Step interval between them
// - a and b cannot be used after returning from the call.
func mergeSeries(qt *querytracer.Tracer, a, b []*timeseries, bStart int64, ec *EvalConfig) ([]*timeseries, bool) {
if qt.Enabled() {
qt = qt.NewChild("merge series on time range %s with step=%dms; len(a)=%d, len(b)=%d, bStart=%s",
ec.timeRangeString(), ec.Step, len(a), len(b), storage.TimestampToHumanReadableFormat(bStart))
defer qt.Done()
}
sharedTimestamps := ec.getSharedTimestamps()
i := 0
for i < len(sharedTimestamps) && sharedTimestamps[i] < bStart {
i++
}
aTimestamps := sharedTimestamps[:i]
bTimestamps := sharedTimestamps[i:]
if len(bTimestamps) == len(sharedTimestamps) {
// Nothing to merge - just return b to the caller
for _, tsB := range b {
if !equalTimestamps(tsB.Timestamps, bTimestamps) {
logger.Panicf("BUG: invalid timestamps in b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps)
}
tsB.denyReuse = true
tsB.Timestamps = sharedTimestamps
}
return b, true
}
bb := bbPool.Get()
defer bbPool.Put(bb)
mA := make(map[string]*timeseries, len(a))
for _, ts := range a {
if !equalTimestamps(ts.Timestamps, aTimestamps) {
logger.Panicf("BUG: invalid timestamps in a series %s; got %d; want %d", &ts.MetricName, ts.Timestamps, aTimestamps)
}
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := mA[string(bb.B)]; ok {
qt.Printf("cannot merge series because a series contain duplicate %s", &ts.MetricName)
return nil, false
}
mA[string(bb.B)] = ts
}
mB := make(map[string]struct{}, len(b))
rvs := make([]*timeseries, 0, len(a))
var aNaNs []float64
for _, tsB := range b {
if !equalTimestamps(tsB.Timestamps, bTimestamps) {
logger.Panicf("BUG: invalid timestamps for b series %s; got %d; want %d", &tsB.MetricName, tsB.Timestamps, bTimestamps)
}
bb.B = marshalMetricNameSorted(bb.B[:0], &tsB.MetricName)
if _, ok := mB[string(bb.B)]; ok {
qt.Printf("cannot merge series because b series contain duplicate %s", &tsB.MetricName)
return nil, false
}
mB[string(bb.B)] = struct{}{}
var tmp timeseries
tmp.denyReuse = true
tmp.Timestamps = sharedTimestamps
tmp.Values = make([]float64, 0, len(tmp.Timestamps))
tmp.MetricName.MoveFrom(&tsB.MetricName)
tsA := mA[string(bb.B)]
if tsA == nil {
if aNaNs == nil {
tStart := ec.Start
for tStart < bStart {
aNaNs = append(aNaNs, nan)
tStart += ec.Step
}
}
tmp.Values = append(tmp.Values, aNaNs...)
} else {
tmp.Values = append(tmp.Values, tsA.Values...)
delete(mA, string(bb.B))
}
tmp.Values = append(tmp.Values, tsB.Values...)
rvs = append(rvs, &tmp)
}
// Copy the remaining timeseries from mA.
var bNaNs []float64
for _, tsA := range mA {
var tmp timeseries
tmp.denyReuse = true
tmp.Timestamps = sharedTimestamps
tmp.Values = make([]float64, 0, len(tmp.Timestamps))
tmp.MetricName.MoveFrom(&tsA.MetricName)
tmp.Values = append(tmp.Values, tsA.Values...)
if bNaNs == nil {
tStart := bStart
for tStart <= ec.End {
bNaNs = append(bNaNs, nan)
tStart += ec.Step
}
}
tmp.Values = append(tmp.Values, bNaNs...)
rvs = append(rvs, &tmp)
}
qt.Printf("resulting series=%d", len(rvs))
return rvs, true
}
type rollupResultCacheMetainfo struct {
entries []rollupResultCacheMetainfoEntry
}
func (mi *rollupResultCacheMetainfo) Marshal(dst []byte) []byte {
dst = encoding.MarshalUint32(dst, uint32(len(mi.entries)))
for i := range mi.entries {
dst = mi.entries[i].Marshal(dst)
}
return dst
}
func (mi *rollupResultCacheMetainfo) Unmarshal(src []byte) error {
if len(src) < 4 {
return fmt.Errorf("cannot unmarshal len(etries) from %d bytes; need at least %d bytes", len(src), 4)
}
entriesLen := int(encoding.UnmarshalUint32(src))
src = src[4:]
if n := entriesLen - cap(mi.entries); n > 0 {
mi.entries = append(mi.entries[:cap(mi.entries)], make([]rollupResultCacheMetainfoEntry, n)...)
}
mi.entries = mi.entries[:entriesLen]
for i := 0; i < entriesLen; i++ {
tail, err := mi.entries[i].Unmarshal(src)
if err != nil {
return fmt.Errorf("cannot unmarshal entry #%d: %w", i, err)
}
src = tail
}
if len(src) > 0 {
return fmt.Errorf("unexpected non-empty tail left; len(tail)=%d", len(src))
}
return nil
}
func (mi *rollupResultCacheMetainfo) CoversTimeRange(start, end int64) bool {
if start > end {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", start, end)
}
for i := range mi.entries {
e := &mi.entries[i]
if start >= e.start && end <= e.end {
return true
}
}
return false
}
func (mi *rollupResultCacheMetainfo) GetBestKey(start, end int64) rollupResultCacheKey {
if start > end {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", start, end)
}
var bestKey rollupResultCacheKey
dMax := int64(0)
for i := range mi.entries {
e := &mi.entries[i]
if start < e.start {
continue
}
d := e.end - start
if end <= e.end {
d = end - start
}
if d >= dMax {
dMax = d
bestKey = e.key
}
}
return bestKey
}
func (mi *rollupResultCacheMetainfo) AddKey(key rollupResultCacheKey, start, end int64) {
if start > end {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", start, end)
}
mi.entries = append(mi.entries, rollupResultCacheMetainfoEntry{
start: start,
end: end,
key: key,
})
if len(mi.entries) > 10 {
// Remove old entries.
mi.entries = append(mi.entries[:0], mi.entries[5:]...)
}
}
func (mi *rollupResultCacheMetainfo) RemoveKey(key rollupResultCacheKey) {
for i := range mi.entries {
if mi.entries[i].key == key {
mi.entries = append(mi.entries[:i], mi.entries[i+1:]...)
return
}
}
}
type rollupResultCacheMetainfoEntry struct {
start int64
end int64
key rollupResultCacheKey
}
func (mie *rollupResultCacheMetainfoEntry) Marshal(dst []byte) []byte {
dst = encoding.MarshalInt64(dst, mie.start)
dst = encoding.MarshalInt64(dst, mie.end)
dst = encoding.MarshalUint64(dst, mie.key.prefix)
dst = encoding.MarshalUint64(dst, mie.key.suffix)
return dst
}
func (mie *rollupResultCacheMetainfoEntry) Unmarshal(src []byte) ([]byte, error) {
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal start from %d bytes; need at least %d bytes", len(src), 8)
}
mie.start = encoding.UnmarshalInt64(src)
src = src[8:]
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal end from %d bytes; need at least %d bytes", len(src), 8)
}
mie.end = encoding.UnmarshalInt64(src)
src = src[8:]
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal key prefix from %d bytes; need at least %d bytes", len(src), 8)
}
mie.key.prefix = encoding.UnmarshalUint64(src)
src = src[8:]
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal key suffix from %d bytes; need at least %d bytes", len(src), 8)
}
mie.key.suffix = encoding.UnmarshalUint64(src)
src = src[8:]
return src, nil
}
// rollupResultCacheKey must be globally unique across vmselect nodes,
// so it has prefix and suffix.
type rollupResultCacheKey struct {
prefix uint64
suffix uint64
}
func (k *rollupResultCacheKey) Marshal(dst []byte) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, k.prefix)
dst = encoding.MarshalUint64(dst, k.suffix)
return dst
}