Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-10-17 12:13:56 +03:00
commit 8bbc83e85e
7 changed files with 271 additions and 29 deletions

View file

@ -5,6 +5,11 @@
* `rollup_func(foo{filters}[d]) op bar` -> `rollup_func(foo{filters}[d]) op bar{filters}`
* `transform_func(foo{filters}) op bar` -> `transform_func(foo{filters}) op bar{filters}`
* `num_or_scalar op foo{filters} op bar` -> `num_or_scalar op foo{filters} op bar{filters}`
* FEATURE: improve time series search for queries with multiple label filters. I.e. `foo{label1="value", label2=~"regexp"}`.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781
* BUGFIX: vmagent: properly handle OpenStack endpoint ending with `v3.0` such as `https://ostack.example.com:5000/v3.0`
in the same way as Prometheus does. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/728#issuecomment-709914803
# [v1.44.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.44.0)

View file

@ -519,9 +519,10 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
}
rfa.values = values[i:j]
rfa.timestamps = timestamps[i:j]
if j == len(timestamps) && i < j && tEnd-timestamps[j-1] > stalenessInterval {
// Do not take into account the last data point in time series if the distance between this data point
// and tEnd exceeds stalenessInterval.
if j == len(timestamps) && j > 0 && (tEnd-timestamps[j-1] > stalenessInterval || i == j && len(timestamps) == 1) {
// Drop trailing data points in the following cases:
// - if the distance between the last raw sample and tEnd exceeds stalenessInterval
// - if time series contains only a single raw sample
// This should prevent from double counting when a label changes in time series (for instance,
// during new deployment in K8S). See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748
rfa.prevValue = nan

View file

@ -71,7 +71,7 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `ideriv(m)` - for calculating `instant` derivative for `m`.
- `deriv_fast(m[d])` - for calculating `fast` derivative for `m` based on the first and the last points from duration `d`.
- `running_` functions - `running_sum`, `running_min`, `running_max`, `running_avg` - for calculating [running values](https://en.wikipedia.org/wiki/Running_total) on the selected time range.
- `range_` functions - `range_sum`, `range_min`, `range_max`, `range_avg`, `range_first`, `range_last`, `range_median`, `range_quantile` - for calculating global value over the selected time range.
- `range_` functions - `range_sum`, `range_min`, `range_max`, `range_avg`, `range_first`, `range_last`, `range_median`, `range_quantile` - for calculating global value over the selected time range. Note that global value is based on calculated datapoints for the inner query. The calculated datapoints can differ from raw datapoints stored in the database. See [these docs](https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness) for details.
- `smooth_exponential(q, sf)` - smooths `q` using [exponential moving average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) with the given smooth factor `sf`.
- `remove_resets(q)` - removes counter resets from `q`.
- `lag(q[d])` - returns lag between the current timestamp and the timestamp from the previous data point in `q` over `d`.
@ -101,8 +101,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`.
For example, the following query calculates median temperature by country over the last 24 hours:
`histogram_quantile(0.5, sum(histogram_over_time(temperature[24h])) by (vmbucket, country))`.
- `histogram_share(le, buckets)` - returns share (in the range 0..1) for `buckets`. Useful for calculating SLI and SLO.
For instance, the following query returns the share of requests which are performed under 1.5 seconds: `histogram_share(1.5, sum(request_duration_seconds_bucket) by (le))`.
- `histogram_share(le, buckets)` - returns share (in the range 0..1) for `buckets` that fall below `le`. Useful for calculating SLI and SLO.
For instance, the following query returns the share of requests which are performed under 1.5 seconds during the last 5 minutes: `histogram_share(1.5, sum(rate(request_duration_seconds_bucket[5m])) by (le))`.
- `topk_*` and `bottomk_*` aggregate functions, which return up to K time series. Note that the standard `topk` function may return more than K time series -
see [this article](https://www.robustperception.io/graph-top-n-time-series-in-grafana) for details.
- `topk_min(k, q)` - returns top K time series with the max minimums on the given time range

View file

@ -3,11 +3,13 @@ package openstack
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strings"
"sync"
"time"
@ -95,6 +97,11 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
// override sdc
sdcAuth = readCredentialsFromEnv()
}
if strings.HasSuffix(sdcAuth.IdentityEndpoint, "v2.0") {
return nil, errors.New("identity_endpoint v2.0 is not supported")
}
// trim .0 from v3.0 for prometheus cfg compatibility
sdcAuth.IdentityEndpoint = strings.TrimSuffix(sdcAuth.IdentityEndpoint, ".0")
parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint)
if err != nil {

View file

@ -2153,7 +2153,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set
}
metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffxies.
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil {
if err == errFallbackToMetricNameMatch {
return nil, err
@ -2563,6 +2563,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// This way we limit the amount of work below by applying more specific filters at first.
type tagFilterWithCount struct {
tf *tagFilter
cost uint64
count uint64
}
tfsWithCount := make([]tagFilterWithCount, len(tfs.tfs))
@ -2578,13 +2579,14 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
tfsWithCount[i] = tagFilterWithCount{
tf: tf,
cost: count * tf.matchCost,
count: count,
}
}
sort.Slice(tfsWithCount, func(i, j int) bool {
a, b := &tfsWithCount[i], &tfsWithCount[j]
if a.count != b.count {
return a.count < b.count
if a.cost != b.cost {
return a.cost < b.cost
}
return a.tf.Less(b.tf)
})

View file

@ -153,6 +153,7 @@ type tagFilter struct {
value []byte
isNegative bool
isRegexp bool
matchCost uint64
// Prefix always contains {nsPrefixTagToMetricIDs, key}.
// Additionally it contains:
@ -267,6 +268,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
// during the search for matching metricIDs.
tf.orSuffixes = append(tf.orSuffixes[:0], "")
tf.isEmptyMatch = len(prefix) == 0
tf.matchCost = fullMatchCost
return nil
}
rcv, err := getRegexpFromCache(expr)
@ -275,6 +277,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
}
tf.orSuffixes = append(tf.orSuffixes[:0], rcv.orValues...)
tf.reSuffixMatch = rcv.reMatch
tf.matchCost = rcv.reCost
tf.isEmptyMatch = len(prefix) == 0 && tf.reSuffixMatch(nil)
if !tf.isNegative && len(key) == 0 && strings.IndexByte(rcv.literalSuffix, '.') >= 0 {
// Reverse suffix is needed only for non-negative regexp filters on __name__ that contains dots.
@ -339,6 +342,7 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
sExpr := string(expr)
orValues := getOrValues(sExpr)
var reMatch func(b []byte) bool
var reCost uint64
var literalSuffix string
if len(orValues) > 0 {
if len(orValues) == 1 {
@ -356,13 +360,15 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
return false
}
}
reCost = uint64(len(orValues)) * literalMatchCost
} else {
reMatch, literalSuffix = getOptimizedReMatchFunc(re.Match, sExpr)
reMatch, literalSuffix, reCost = getOptimizedReMatchFunc(re.Match, sExpr)
}
// Put the reMatch in the cache.
rcv.orValues = orValues
rcv.reMatch = reMatch
rcv.reCost = reCost
rcv.literalSuffix = literalSuffix
regexpCacheLock.Lock()
@ -397,32 +403,42 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
// It returns reMatch if it cannot find optimized function.
//
// It also returns literal suffix from the expr.
func getOptimizedReMatchFunc(reMatch func(b []byte) bool, expr string) (func(b []byte) bool, string) {
func getOptimizedReMatchFunc(reMatch func(b []byte) bool, expr string) (func(b []byte) bool, string, uint64) {
sre, err := syntax.Parse(expr, syntax.Perl)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing verified expr=%q: %s", expr, err)
}
if matchFunc, literalSuffix := getOptimizedReMatchFuncExt(reMatch, sre); matchFunc != nil {
if matchFunc, literalSuffix, reCost := getOptimizedReMatchFuncExt(reMatch, sre); matchFunc != nil {
// Found optimized function for matching the expr.
suffixUnescaped := tagCharsReverseRegexpEscaper.Replace(literalSuffix)
return matchFunc, suffixUnescaped
return matchFunc, suffixUnescaped, reCost
}
// Fall back to un-optimized reMatch.
return reMatch, ""
return reMatch, "", reMatchCost
}
func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp) (func(b []byte) bool, string) {
// The following & default cost values are returned from BenchmarkOptimizedReMatchCost
const (
fullMatchCost = 1
prefixMatchCost = 2
literalMatchCost = 3
suffixMatchCost = 4
middleMatchCost = 6
reMatchCost = 100
)
func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp) (func(b []byte) bool, string, uint64) {
if isDotStar(sre) {
// '.*'
return func(b []byte) bool {
return true
}, ""
}, "", fullMatchCost
}
if isDotPlus(sre) {
// '.+'
return func(b []byte) bool {
return len(b) > 0
}, ""
}, "", fullMatchCost
}
switch sre.Op {
case syntax.OpCapture:
@ -430,13 +446,13 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
return getOptimizedReMatchFuncExt(reMatch, sre.Sub[0])
case syntax.OpLiteral:
if !isLiteral(sre) {
return nil, ""
return nil, "", 0
}
s := string(sre.Rune)
// Literal match
return func(b []byte) bool {
return string(b) == s
}, s
}, s, literalMatchCost
case syntax.OpConcat:
if len(sre.Sub) == 2 {
if isLiteral(sre.Sub[0]) {
@ -445,13 +461,13 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
// 'prefix.*'
return func(b []byte) bool {
return bytes.HasPrefix(b, prefix)
}, ""
}, "", prefixMatchCost
}
if isDotPlus(sre.Sub[1]) {
// 'prefix.+'
return func(b []byte) bool {
return len(b) > len(prefix) && bytes.HasPrefix(b, prefix)
}, ""
}, "", prefixMatchCost
}
}
if isLiteral(sre.Sub[1]) {
@ -460,13 +476,13 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
// '.*suffix'
return func(b []byte) bool {
return bytes.HasSuffix(b, suffix)
}, string(suffix)
}, string(suffix), suffixMatchCost
}
if isDotPlus(sre.Sub[0]) {
// '.+suffix'
return func(b []byte) bool {
return len(b) > len(suffix) && bytes.HasSuffix(b[1:], suffix)
}, string(suffix)
}, string(suffix), suffixMatchCost
}
}
}
@ -477,13 +493,13 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
// '.*middle.*'
return func(b []byte) bool {
return bytes.Contains(b, middle)
}, ""
}, "", middleMatchCost
}
if isDotPlus(sre.Sub[2]) {
// '.*middle.+'
return func(b []byte) bool {
return len(b) > len(middle) && bytes.Contains(b[:len(b)-1], middle)
}, ""
}, "", middleMatchCost
}
}
if isDotPlus(sre.Sub[0]) {
@ -491,13 +507,13 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
// '.+middle.*'
return func(b []byte) bool {
return len(b) > len(middle) && bytes.Contains(b[1:], middle)
}, ""
}, "", middleMatchCost
}
if isDotPlus(sre.Sub[2]) {
// '.+middle.+'
return func(b []byte) bool {
return len(b) > len(middle)+1 && bytes.Contains(b[1:len(b)-1], middle)
}, ""
}, "", middleMatchCost
}
}
}
@ -531,9 +547,9 @@ func getOptimizedReMatchFuncExt(reMatch func(b []byte) bool, sre *syntax.Regexp)
}
// Fall back to slow path.
return reMatch(bOrig)
}, string(suffix)
}, string(suffix), reMatchCost
default:
return nil, ""
return nil, "", 0
}
}
@ -720,6 +736,7 @@ var (
type regexpCacheValue struct {
orValues []string
reMatch func(b []byte) bool
reCost uint64
literalSuffix string
}

View file

@ -1,6 +1,8 @@
package storage
import (
"bytes"
"regexp"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -307,3 +309,211 @@ func BenchmarkTagFilterMatchSuffix(b *testing.B) {
})
})
}
// Run the following command to get the execution cost of all matches
//
// go test -run=none -bench=BenchmarkOptimizedReMatchCost -count 20 github.com/VictoriaMetrics/VictoriaMetrics/lib/storage | tee cost.txt
// benchstat ./cost.txt
//
// Calculate the multiplier of default for each match overhead.
func BenchmarkOptimizedReMatchCost(b *testing.B) {
b.Run("fullMatchCost", func(b *testing.B) {
reMatch := func(b []byte) bool {
return len(b) == 0
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run("literalMatchCost", func(b *testing.B) {
s := "foo1.bar.baz.sss.ddd"
reMatch := func(b []byte) bool {
return string(b) == s
}
suffix := []byte(s)
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run("threeLiteralsMatchCost", func(b *testing.B) {
s := []string{"foo", "bar", "baz"}
reMatch := func(b []byte) bool {
for _, v := range s {
if string(b) == v {
return true
}
}
return false
}
suffix := []byte("ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".*", func(b *testing.B) {
reMatch := func(b []byte) bool {
return true
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".+", func(b *testing.B) {
reMatch := func(b []byte) bool {
return len(b) > 0
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run("prefix.*", func(b *testing.B) {
s := []byte("foo1.bar")
reMatch := func(b []byte) bool {
return bytes.HasPrefix(b, s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run("prefix.+", func(b *testing.B) {
s := []byte("foo1.bar")
reMatch := func(b []byte) bool {
return len(b) > len(s) && bytes.HasPrefix(b, s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".*suffix", func(b *testing.B) {
s := []byte("sss.ddd")
reMatch := func(b []byte) bool {
return bytes.HasSuffix(b, s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".+suffix", func(b *testing.B) {
s := []byte("sss.ddd")
reMatch := func(b []byte) bool {
return len(b) > len(s) && bytes.HasSuffix(b[1:], s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".*middle.*", func(b *testing.B) {
s := []byte("bar.baz")
reMatch := func(b []byte) bool {
return bytes.Contains(b, s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".*middle.+", func(b *testing.B) {
s := []byte("bar.baz")
reMatch := func(b []byte) bool {
return len(b) > len(s) && bytes.Contains(b[:len(b)-1], s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".+middle.*", func(b *testing.B) {
s := []byte("bar.baz")
reMatch := func(b []byte) bool {
return len(b) > len(s) && bytes.Contains(b[1:], s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run(".+middle.+", func(b *testing.B) {
s := []byte("bar.baz")
reMatch := func(b []byte) bool {
return len(b) > len(s)+1 && bytes.Contains(b[1:len(b)-1], s)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
b.Run("reMatchCost", func(b *testing.B) {
re := regexp.MustCompile(`foo[^.]*?\.bar\.baz\.[^.]*?\.ddd`)
reMatch := func(b []byte) bool {
return re.Match(b)
}
suffix := []byte("foo1.bar.baz.sss.ddd")
b.ReportAllocs()
b.SetBytes(int64(1))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reMatch(suffix)
}
})
})
}