mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: parallelize tag filters execution a bit
This should reduce execution time when a query contains multiple tag filters and each such filter matches big number of time series.
This commit is contained in:
parent
c7ee2fabb8
commit
b51c23dc5b
1 changed files with 59 additions and 28 deletions
|
@ -2807,7 +2807,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
return a.tf.Less(b.tf)
|
return a.tf.Less(b.tf)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Populate metricIDs with the first non-negative filter.
|
// Populate metricIDs for the first non-negative filter.
|
||||||
var tfsPostponed []*tagFilter
|
var tfsPostponed []*tagFilter
|
||||||
var metricIDs *uint64set.Set
|
var metricIDs *uint64set.Set
|
||||||
maxDateMetrics := maxMetrics * 50
|
maxDateMetrics := maxMetrics * 50
|
||||||
|
@ -2859,36 +2859,67 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
// Intersect metricIDs with the rest of filters.
|
// Intersect metricIDs with the rest of filters.
|
||||||
|
var mu sync.Mutex
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var errGlobal error
|
||||||
|
// Limit the number of concurrent goroutines for metricIDs filtering in the hope they reduce the number
|
||||||
|
// of matching metrics to quite low value, so the remaining filters could be matched by metricName.
|
||||||
|
concurrencyCh := make(chan struct{}, 2)
|
||||||
for i := range tfsRemainingWithCount {
|
for i := range tfsRemainingWithCount {
|
||||||
tfWithCount := tfsRemainingWithCount[i]
|
tfWithCount := tfsRemainingWithCount[i]
|
||||||
if n := uint64(metricIDs.Len()); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) {
|
|
||||||
// It should be faster performing metricName match on the remaining filters
|
|
||||||
// instead of scanning big number of entries in the inverted index for these filters.
|
|
||||||
for i < len(tfsRemainingWithCount) {
|
|
||||||
tfsPostponed = append(tfsPostponed, tfsRemainingWithCount[i].tf)
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
tf := tfWithCount.tf
|
tf := tfWithCount.tf
|
||||||
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
|
wg.Add(1)
|
||||||
if err != nil {
|
go func() {
|
||||||
return nil, err
|
concurrencyCh <- struct{}{}
|
||||||
}
|
defer func() {
|
||||||
if m.Len() >= maxDateMetrics {
|
<-concurrencyCh
|
||||||
// Too many time series found by a single tag filter. Postpone applying this filter via metricName match.
|
wg.Done()
|
||||||
tfsPostponed = append(tfsPostponed, tf)
|
}()
|
||||||
continue
|
mu.Lock()
|
||||||
}
|
metricIDsLen := metricIDs.Len()
|
||||||
if tf.isNegative {
|
mu.Unlock()
|
||||||
metricIDs.Subtract(m)
|
if metricIDsLen == 0 {
|
||||||
} else {
|
// Short circuit - there is no need in applying the remaining filters to empty set.
|
||||||
metricIDs.Intersect(m)
|
return
|
||||||
}
|
}
|
||||||
if metricIDs.Len() == 0 {
|
if n := uint64(metricIDsLen); n < 1000 || (n < tfWithCount.count/maxIndexScanLoopsPerMetric && n < uint64(maxMetrics)/10) {
|
||||||
// Short circuit - there is no need in applying the remaining filters to empty set.
|
// It should be faster performing metricName match on the remaining filters
|
||||||
return nil, nil
|
// instead of scanning big number of entries in the inverted index for these filters.
|
||||||
}
|
mu.Lock()
|
||||||
|
tfsPostponed = append(tfsPostponed, tf)
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
isLocal := is.db.getIndexSearch(is.deadline)
|
||||||
|
m, err := isLocal.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
|
||||||
|
isLocal.db.putIndexSearch(isLocal)
|
||||||
|
if err != nil {
|
||||||
|
mu.Lock()
|
||||||
|
if errGlobal == nil {
|
||||||
|
errGlobal = err
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.Len() >= maxDateMetrics {
|
||||||
|
// Too many time series found by a single tag filter. Postpone applying this filter via metricName match.
|
||||||
|
mu.Lock()
|
||||||
|
tfsPostponed = append(tfsPostponed, tf)
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
if tf.isNegative {
|
||||||
|
metricIDs.Subtract(m)
|
||||||
|
} else {
|
||||||
|
metricIDs.Intersect(m)
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if errGlobal != nil {
|
||||||
|
return nil, errGlobal
|
||||||
}
|
}
|
||||||
if len(tfsPostponed) > 0 {
|
if len(tfsPostponed) > 0 {
|
||||||
if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 {
|
if n := metricIDs.Len(); n > 50000 && n > maxMetrics/10 {
|
||||||
|
|
Loading…
Reference in a new issue