diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 952e8d572..570257eb2 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -472,22 +472,6 @@ func removeNanValues(dstValues []float64, dstTimestamps []int64, values []float6 return dstValues, dstTimestamps } -func getMaxPointsPerRollup() int { - maxPointsPerRollupOnce.Do(func() { - n := memory.Allowed() / 16 / 8 - if n <= 16 { - n = 16 - } - maxPointsPerRollup = n - }) - return maxPointsPerRollup -} - -var ( - maxPointsPerRollup int - maxPointsPerRollupOnce sync.Once -) - var ( rollupResultCacheFullHits = metrics.NewCounter(`vm_rollup_result_cache_full_hits_total`) rollupResultCachePartialHits = metrics.NewCounter(`vm_rollup_result_cache_partial_hits_total`) @@ -533,12 +517,17 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me // Verify timeseries fit available memory after the rollup. // Take into account points from tssCached. pointsPerTimeseries := 1 + (ec.End-ec.Start)/ec.Step - if uint64(pointsPerTimeseries) > uint64(getMaxPointsPerRollup()/rssLen/len(rcs)) { + rollupPoints := mulNoOverflow(pointsPerTimeseries, int64(rssLen*len(rcs))) + rollupMemorySize := mulNoOverflow(rollupPoints, 16) + rml := getRollupMemoryLimiter() + if !rml.Get(uint64(rollupMemorySize)) { rss.Cancel() - return nil, fmt.Errorf("cannot process more than %d data points for %d time series with %d points in each time series; "+ - "possible solutions are: reducing the number of matching time series; switching to node with more RAM; increasing `step` query arg (%gs)", - getMaxPointsPerRollup(), rssLen*len(rcs), pointsPerTimeseries, float64(ec.Step)/1e3) + return nil, fmt.Errorf("not enough memory for processing %d data points across %d time series with %d points in each time series; "+ + "possible solutions are: reducing the number of matching time series; switching to node with more RAM; "+ + "increasing -memory.allowedPercent; increasing `step` query arg (%gs)", + rollupPoints, rssLen*len(rcs), pointsPerTimeseries, float64(ec.Step)/1e3) } + defer rml.Put(uint64(rollupMemorySize)) // Evaluate rollup tss := make([]*timeseries, 0, rssLen*len(rcs)) @@ -575,6 +564,18 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me return tss, nil } +var ( + rollupMemoryLimiter memoryLimiter + rollupMemoryLimiterOnce sync.Once +) + +func getRollupMemoryLimiter() *memoryLimiter { + rollupMemoryLimiterOnce.Do(func() { + rollupMemoryLimiter.MaxSize = uint64(memory.Allowed()) / 4 + }) + return &rollupMemoryLimiter +} + func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, sharedTimestamps []int64) (func(values []float64, timestamps []int64), []*rollupConfig) { preFunc := func(values []float64, timestamps []int64) {} if rollupFuncsRemoveCounterResets[name] { @@ -653,3 +654,11 @@ func evalTime(ec *EvalConfig) []*timeseries { } return rv } + +func mulNoOverflow(a, b int64) int64 { + if math.MaxInt64/b < a { + // Overflow + return math.MaxInt64 + } + return a * b +} diff --git a/app/vmselect/promql/memory_limiter.go b/app/vmselect/promql/memory_limiter.go new file mode 100644 index 000000000..e9a76b143 --- /dev/null +++ b/app/vmselect/promql/memory_limiter.go @@ -0,0 +1,33 @@ +package promql + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +type memoryLimiter struct { + MaxSize uint64 + + mu sync.Mutex + usage uint64 +} + +func (ml *memoryLimiter) Get(n uint64) bool { + ml.mu.Lock() + ok := n <= ml.MaxSize && ml.MaxSize-n >= ml.usage + if ok { + ml.usage += n + } + ml.mu.Unlock() + return ok +} + +func (ml *memoryLimiter) Put(n uint64) { + ml.mu.Lock() + if n > ml.usage { + logger.Panicf("BUG: n=%d cannot exceed %d", n, ml.usage) + } + ml.usage -= n + ml.mu.Unlock() +} diff --git a/app/vmselect/promql/memory_limiter_test.go b/app/vmselect/promql/memory_limiter_test.go new file mode 100644 index 000000000..4477678e4 --- /dev/null +++ b/app/vmselect/promql/memory_limiter_test.go @@ -0,0 +1,56 @@ +package promql + +import ( + "testing" +) + +func TestMemoryLimiter(t *testing.T) { + var ml memoryLimiter + ml.MaxSize = 100 + + // Allocate memory + if !ml.Get(10) { + t.Fatalf("cannot get 10 out of %d bytes", ml.MaxSize) + } + if ml.usage != 10 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 10) + } + if !ml.Get(20) { + t.Fatalf("cannot get 20 out of 90 bytes") + } + if ml.usage != 30 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30) + } + if ml.Get(1000) { + t.Fatalf("unexpected get for 1000 bytes") + } + if ml.usage != 30 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30) + } + if ml.Get(71) { + t.Fatalf("unexpected get for 71 bytes") + } + if ml.usage != 30 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 30) + } + if !ml.Get(70) { + t.Fatalf("cannot get 70 bytes") + } + if ml.usage != 100 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 100) + } + + // Return memory back + ml.Put(10) + ml.Put(70) + if ml.usage != 20 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 20) + } + if !ml.Get(30) { + t.Fatalf("cannot get 30 bytes") + } + ml.Put(50) + if ml.usage != 0 { + t.Fatalf("unexpected usage; got %d; want %d", ml.usage, 0) + } +}