mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: tune sorting for tag filters
This commit is contained in:
parent
3eae03a337
commit
55952f8f2e
2 changed files with 56 additions and 42 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
|
@ -2801,33 +2802,38 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
||||||
// Sort tfs by the number of matching filters from previous queries.
|
// Sort tfs by the duration from previous queries.
|
||||||
// This way we limit the amount of work below by applying more specific filters at first.
|
// This way we limit the amount of work below by applying more specific filters at first.
|
||||||
type tagFilterWithCount struct {
|
type tagFilterWithWeight struct {
|
||||||
tf *tagFilter
|
tf *tagFilter
|
||||||
seconds float64
|
durationSeconds float64
|
||||||
|
lastQueryTimestamp uint64
|
||||||
}
|
}
|
||||||
tfsWithCount := make([]tagFilterWithCount, len(tfs.tfs))
|
tfsWithWeight := make([]tagFilterWithWeight, len(tfs.tfs))
|
||||||
kb := &is.kb
|
kb := &is.kb
|
||||||
var buf []byte
|
var buf []byte
|
||||||
for i := range tfs.tfs {
|
for i := range tfs.tfs {
|
||||||
tf := &tfs.tfs[i]
|
tf := &tfs.tfs[i]
|
||||||
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, is.accountID, is.projectID)
|
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, is.accountID, is.projectID)
|
||||||
buf = is.db.durationsPerDateTagFilterCache.Get(buf[:0], kb.B)
|
buf = is.db.durationsPerDateTagFilterCache.Get(buf[:0], kb.B)
|
||||||
seconds := float64(0)
|
var lastQueryTimestamp uint64
|
||||||
if len(buf) == 8 {
|
// Assume unknwon tag filters may take up to a second for execution.
|
||||||
n := encoding.UnmarshalUint64(buf)
|
durationSeconds := float64(1)
|
||||||
seconds = math.Float64frombits(n)
|
if len(buf) == 16 {
|
||||||
|
lastQueryTimestamp = encoding.UnmarshalUint64(buf)
|
||||||
|
n := encoding.UnmarshalUint64(buf[8:])
|
||||||
|
durationSeconds = math.Float64frombits(n)
|
||||||
}
|
}
|
||||||
tfsWithCount[i] = tagFilterWithCount{
|
tfsWithWeight[i] = tagFilterWithWeight{
|
||||||
tf: tf,
|
tf: tf,
|
||||||
seconds: seconds,
|
durationSeconds: durationSeconds,
|
||||||
|
lastQueryTimestamp: lastQueryTimestamp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Slice(tfsWithCount, func(i, j int) bool {
|
sort.Slice(tfsWithWeight, func(i, j int) bool {
|
||||||
a, b := &tfsWithCount[i], &tfsWithCount[j]
|
a, b := &tfsWithWeight[i], &tfsWithWeight[j]
|
||||||
if a.seconds != b.seconds {
|
if a.durationSeconds != b.durationSeconds {
|
||||||
return a.seconds < b.seconds
|
return a.durationSeconds < b.durationSeconds
|
||||||
}
|
}
|
||||||
return a.tf.Less(b.tf)
|
return a.tf.Less(b.tf)
|
||||||
})
|
})
|
||||||
|
@ -2836,14 +2842,15 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
var tfsPostponed []*tagFilter
|
var tfsPostponed []*tagFilter
|
||||||
var metricIDs *uint64set.Set
|
var metricIDs *uint64set.Set
|
||||||
maxDateMetrics := maxMetrics * 50
|
maxDateMetrics := maxMetrics * 50
|
||||||
tfsRemainingWithCount := tfsWithCount[:0]
|
tfsRemainingWithWeight := tfsWithWeight[:0]
|
||||||
for i := range tfsWithCount {
|
for i := range tfsWithWeight {
|
||||||
tf := tfsWithCount[i].tf
|
tfw := tfsWithWeight[i]
|
||||||
|
tf := tfw.tf
|
||||||
if tf.isNegative {
|
if tf.isNegative {
|
||||||
tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i])
|
tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfw)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
|
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -2854,8 +2861,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
}
|
}
|
||||||
metricIDs = m
|
metricIDs = m
|
||||||
i++
|
i++
|
||||||
for i < len(tfsWithCount) {
|
for i < len(tfsWithWeight) {
|
||||||
tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i])
|
tfsRemainingWithWeight = append(tfsRemainingWithWeight, tfsWithWeight[i])
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
|
@ -2885,21 +2892,21 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
// when the intial tag filters significantly reduce the number of found metricIDs,
|
// when the intial tag filters significantly reduce the number of found metricIDs,
|
||||||
// so the remaining filters could be performed via much faster metricName matching instead
|
// so the remaining filters could be performed via much faster metricName matching instead
|
||||||
// of slow selecting of matching metricIDs.
|
// of slow selecting of matching metricIDs.
|
||||||
for i := range tfsRemainingWithCount {
|
for i := range tfsRemainingWithWeight {
|
||||||
tfWithCount := tfsRemainingWithCount[i]
|
tfw := tfsRemainingWithWeight[i]
|
||||||
tf := tfWithCount.tf
|
tf := tfw.tf
|
||||||
metricIDsLen := metricIDs.Len()
|
metricIDsLen := metricIDs.Len()
|
||||||
if metricIDsLen == 0 {
|
if metricIDsLen == 0 {
|
||||||
// Short circuit - there is no need in applying the remaining filters to an empty set.
|
// Short circuit - there is no need in applying the remaining filters to an empty set.
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if float64(metricIDsLen)/metricNameMatchesPerSecond < tfWithCount.seconds {
|
if float64(metricIDsLen)/metricNameMatchesPerSecond < tfw.durationSeconds {
|
||||||
// It should be faster performing metricName match on the remaining filters
|
// It should be faster performing metricName match on the remaining filters
|
||||||
// instead of scanning big number of entries in the inverted index for these filters.
|
// instead of scanning big number of entries in the inverted index for these filters.
|
||||||
tfsPostponed = append(tfsPostponed, tf)
|
tfsPostponed = append(tfsPostponed, tf)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
|
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -2919,11 +2926,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if len(tfsPostponed) > 0 {
|
if len(tfsPostponed) > 0 {
|
||||||
if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 {
|
|
||||||
// It will be slow to perform metricName match on this number of time series.
|
|
||||||
// Fall back to global search.
|
|
||||||
return nil, errFallbackToMetricNameMatch
|
|
||||||
}
|
|
||||||
// Apply the postponed filters via metricName match.
|
// Apply the postponed filters via metricName match.
|
||||||
var m uint64set.Set
|
var m uint64set.Set
|
||||||
if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed); err != nil {
|
if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed); err != nil {
|
||||||
|
@ -2938,7 +2940,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
//
|
//
|
||||||
// This value is used for determining when matching by metric name must be perfromed instead of matching
|
// This value is used for determining when matching by metric name must be perfromed instead of matching
|
||||||
// by the remaining tag filters.
|
// by the remaining tag filters.
|
||||||
const metricNameMatchesPerSecond = 10000
|
const metricNameMatchesPerSecond = 50000
|
||||||
|
|
||||||
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
||||||
ii := getIndexItems()
|
ii := getIndexItems()
|
||||||
|
@ -3086,7 +3088,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
|
||||||
// Augument tag filter prefix for per-date search instead of global search.
|
// Augument tag filter prefix for per-date search instead of global search.
|
||||||
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
||||||
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
||||||
|
@ -3102,18 +3104,30 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
|
||||||
tfNew.prefix = kb.B
|
tfNew.prefix = kb.B
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
|
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics)
|
||||||
duration := time.Since(startTime)
|
currentTimestamp := fasttime.UnixTimestamp()
|
||||||
|
if currentTimestamp == lastQueryTimestamp {
|
||||||
|
// The cache already contains quite fresh entry for the current (date, tf).
|
||||||
|
// Do not update it too frequently.
|
||||||
|
return metricIDs, err
|
||||||
|
}
|
||||||
// Store the duration for tag filter execution in the cache in order to sort tag filters
|
// Store the duration for tag filter execution in the cache in order to sort tag filters
|
||||||
// in ascending durations on the next search.
|
// in ascending durations on the next search.
|
||||||
|
duration := time.Since(startTime)
|
||||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Set duration to big value, so the given tag filter will be moved to the end
|
// Set duration to big value, so the given tag filter will be moved to the end
|
||||||
// of tag filters on the next search.
|
// of tag filters on the next search.
|
||||||
duration = time.Hour
|
duration = time.Hour
|
||||||
}
|
}
|
||||||
seconds := duration.Seconds()
|
durationSeconds := duration.Seconds()
|
||||||
n := math.Float64bits(seconds)
|
if metricIDs.Len() >= maxMetrics {
|
||||||
kb.B = encoding.MarshalUint64(kb.B[:0], n)
|
// Increase the duration for tag filter matching too many metrics,
|
||||||
|
// So next time it will be applied after filters matching lower number of metrics.
|
||||||
|
durationSeconds *= 2
|
||||||
|
}
|
||||||
|
n := math.Float64bits(durationSeconds)
|
||||||
|
kb.B = encoding.MarshalUint64(kb.B[:0], currentTimestamp)
|
||||||
|
kb.B = encoding.MarshalUint64(kb.B, n)
|
||||||
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||||
return metricIDs, err
|
return metricIDs, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,12 +255,12 @@ func (tf *tagFilter) Less(other *tagFilter) bool {
|
||||||
if tf.matchCost != other.matchCost {
|
if tf.matchCost != other.matchCost {
|
||||||
return tf.matchCost < other.matchCost
|
return tf.matchCost < other.matchCost
|
||||||
}
|
}
|
||||||
if tf.isRegexp != other.isRegexp {
|
|
||||||
return !tf.isRegexp
|
|
||||||
}
|
|
||||||
if tf.isNegative != other.isNegative {
|
if tf.isNegative != other.isNegative {
|
||||||
return !tf.isNegative
|
return !tf.isNegative
|
||||||
}
|
}
|
||||||
|
if tf.isRegexp != other.isRegexp {
|
||||||
|
return !tf.isRegexp
|
||||||
|
}
|
||||||
if len(tf.orSuffixes) != len(other.orSuffixes) {
|
if len(tf.orSuffixes) != len(other.orSuffixes) {
|
||||||
return len(tf.orSuffixes) < len(other.orSuffixes)
|
return len(tf.orSuffixes) < len(other.orSuffixes)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue