mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: add more fine-grained pace limiting for search
This commit is contained in:
parent
16a4b1b20c
commit
dbf3038637
5 changed files with 80 additions and 15 deletions
|
@ -115,7 +115,7 @@ func timeseriesWorker(workerID uint) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunParallel runs in parallel f for all the results from rss.
|
// RunParallel runs f in parallel for all the results from rss.
|
||||||
//
|
//
|
||||||
// f shouldn't hold references to rs after returning.
|
// f shouldn't hold references to rs after returning.
|
||||||
// workerID is the id of the worker goroutine that calls f.
|
// workerID is the id of the worker goroutine that calls f.
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||||
"github.com/VictoriaMetrics/fastcache"
|
"github.com/VictoriaMetrics/fastcache"
|
||||||
|
@ -783,10 +784,15 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string
|
||||||
mp := &is.mp
|
mp := &is.mp
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
dmis := is.db.getDeletedMetricIDs()
|
dmis := is.db.getDeletedMetricIDs()
|
||||||
|
loopsPaceLimiter := 0
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
|
||||||
prefix := kb.B
|
prefix := kb.B
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for len(tks) < maxTagKeys && ts.NextItem() {
|
for len(tks) < maxTagKeys && ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
break
|
break
|
||||||
|
@ -855,11 +861,16 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri
|
||||||
mp := &is.mp
|
mp := &is.mp
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
dmis := is.db.getDeletedMetricIDs()
|
dmis := is.db.getDeletedMetricIDs()
|
||||||
|
loopsPaceLimiter := 0
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
|
||||||
kb.B = marshalTagValue(kb.B, tagKey)
|
kb.B = marshalTagValue(kb.B, tagKey)
|
||||||
prefix := kb.B
|
prefix := kb.B
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for len(tvs) < maxTagValues && ts.NextItem() {
|
for len(tvs) < maxTagValues && ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
break
|
break
|
||||||
|
@ -923,12 +934,17 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
kb := &is.kb
|
kb := &is.kb
|
||||||
mp := &is.mp
|
mp := &is.mp
|
||||||
|
loopsPaceLimiter := 0
|
||||||
var metricIDsLen uint64
|
var metricIDsLen uint64
|
||||||
// Extract the number of series from ((__name__=value): metricIDs) rows
|
// Extract the number of series from ((__name__=value): metricIDs) rows
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
|
||||||
kb.B = marshalTagValue(kb.B, nil)
|
kb.B = marshalTagValue(kb.B, nil)
|
||||||
ts.Seek(kb.B)
|
ts.Seek(kb.B)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, kb.B) {
|
if !bytes.HasPrefix(item, kb.B) {
|
||||||
break
|
break
|
||||||
|
@ -989,11 +1005,16 @@ func (is *indexSearch) getTSDBStatusForDate(accountID, projectID uint32, date ui
|
||||||
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
|
var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
|
||||||
nameEqualBytes := []byte("__name__=")
|
nameEqualBytes := []byte("__name__=")
|
||||||
|
|
||||||
|
loopsPaceLimiter := 0
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
prefix := kb.B
|
prefix := kb.B
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
break
|
break
|
||||||
|
@ -1523,7 +1544,10 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
|
||||||
// Obtain TSID values for the given metricIDs.
|
// Obtain TSID values for the given metricIDs.
|
||||||
tsids := make([]TSID, len(metricIDs))
|
tsids := make([]TSID, len(metricIDs))
|
||||||
i := 0
|
i := 0
|
||||||
for _, metricID := range metricIDs {
|
for loopsPaceLimiter, metricID := range metricIDs {
|
||||||
|
if loopsPaceLimiter&(1<<10) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
||||||
// than scanning the mergeset if it contains a lot of metricIDs.
|
// than scanning the mergeset if it contains a lot of metricIDs.
|
||||||
tsid := &tsids[i]
|
tsid := &tsids[i]
|
||||||
|
@ -1590,7 +1614,10 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
|
||||||
defer kbPool.Put(metricName)
|
defer kbPool.Put(metricName)
|
||||||
mn := GetMetricName()
|
mn := GetMetricName()
|
||||||
defer PutMetricName(mn)
|
defer PutMetricName(mn)
|
||||||
for _, metricID := range sortedMetricIDs {
|
for loopsPaceLimiter, metricID := range sortedMetricIDs {
|
||||||
|
if loopsPaceLimiter&(1<<10) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
var err error
|
var err error
|
||||||
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID, accountID, projectID)
|
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID, accountID, projectID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2063,16 +2090,21 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan all the rows with tf.prefix and call f on every tf match.
|
// Scan all the rows with tf.prefix and call f on every tf match.
|
||||||
loops := 0
|
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
kb := &is.kb
|
kb := &is.kb
|
||||||
mp := &is.mp
|
mp := &is.mp
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
var prevMatchingSuffix []byte
|
var prevMatchingSuffix []byte
|
||||||
var prevMatch bool
|
var prevMatch bool
|
||||||
|
loops := 0
|
||||||
|
loopsPaceLimiter := 0
|
||||||
prefix := tf.prefix
|
prefix := tf.prefix
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<14) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
return nil
|
return nil
|
||||||
|
@ -2188,8 +2220,13 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
|
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
|
||||||
loops := 0
|
loops := 0
|
||||||
|
loopsPaceLimiter := 0
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
return nil
|
return nil
|
||||||
|
@ -2221,10 +2258,15 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric
|
maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric
|
||||||
loops := 0
|
loops := 0
|
||||||
|
loopsPaceLimiter := 0
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
var sf []uint64
|
var sf []uint64
|
||||||
var metricID uint64
|
var metricID uint64
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<12) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
return nil
|
return nil
|
||||||
|
@ -2768,8 +2810,13 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, p
|
||||||
func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error {
|
func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error {
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
mp := &is.mp
|
mp := &is.mp
|
||||||
|
loopsPaceLimiter := 0
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BlockRef references a Block.
|
// BlockRef references a Block.
|
||||||
|
@ -131,6 +132,8 @@ type Search struct {
|
||||||
err error
|
err error
|
||||||
|
|
||||||
needClosing bool
|
needClosing bool
|
||||||
|
|
||||||
|
loops int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Search) reset() {
|
func (s *Search) reset() {
|
||||||
|
@ -141,6 +144,7 @@ func (s *Search) reset() {
|
||||||
s.ts.reset()
|
s.ts.reset()
|
||||||
s.err = nil
|
s.err = nil
|
||||||
s.needClosing = false
|
s.needClosing = false
|
||||||
|
s.loops = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initializes s from the given storage, tfss and tr.
|
// Init initializes s from the given storage, tfss and tr.
|
||||||
|
@ -194,6 +198,10 @@ func (s *Search) NextMetricBlock() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for s.ts.NextBlock() {
|
for s.ts.NextBlock() {
|
||||||
|
if s.loops&(1<<10) == 0 {
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
s.loops++
|
||||||
tsid := &s.ts.BlockRef.bh.TSID
|
tsid := &s.ts.BlockRef.bh.TSID
|
||||||
var err error
|
var err error
|
||||||
s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID)
|
s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID)
|
||||||
|
|
|
@ -863,11 +863,6 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
|
||||||
|
|
||||||
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
||||||
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
||||||
// Make sure that there are enough resources for processing data ingestion before starting the query.
|
|
||||||
// This should prevent from data ingestion starvation when provessing heavy queries.
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 .
|
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
|
||||||
|
|
||||||
// Do not cache tfss -> tsids here, since the caching is performed
|
// Do not cache tfss -> tsids here, since the caching is performed
|
||||||
// on idb level.
|
// on idb level.
|
||||||
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics)
|
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics)
|
||||||
|
@ -879,18 +874,27 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
|
||||||
|
|
||||||
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
|
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
|
||||||
//
|
//
|
||||||
|
// It is expected that all the tsdis have the same (accountID, projectID)
|
||||||
|
//
|
||||||
// This should speed-up further searchMetricName calls for metricIDs from tsids.
|
// This should speed-up further searchMetricName calls for metricIDs from tsids.
|
||||||
func (s *Storage) prefetchMetricNames(tsids []TSID) error {
|
func (s *Storage) prefetchMetricNames(tsids []TSID) error {
|
||||||
|
if len(tsids) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
accountID := tsids[0].AccountID
|
||||||
|
projectID := tsids[0].ProjectID
|
||||||
var metricIDs uint64Sorter
|
var metricIDs uint64Sorter
|
||||||
tsidsMap := make(map[uint64]*TSID)
|
|
||||||
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
|
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
|
||||||
for i := range tsids {
|
for i := range tsids {
|
||||||
metricID := tsids[i].MetricID
|
tsid := &tsids[i]
|
||||||
|
if tsid.AccountID != accountID || tsid.ProjectID != projectID {
|
||||||
|
logger.Panicf("BUG: unexpected (accountID, projectID) in tsid=%#v; want accountID=%d, projectID=%d", tsid, accountID, projectID)
|
||||||
|
}
|
||||||
|
metricID := tsid.MetricID
|
||||||
if prefetchedMetricIDs.Has(metricID) {
|
if prefetchedMetricIDs.Has(metricID) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
metricIDs = append(metricIDs, metricID)
|
metricIDs = append(metricIDs, metricID)
|
||||||
tsidsMap[metricID] = &tsids[i]
|
|
||||||
}
|
}
|
||||||
if len(metricIDs) < 500 {
|
if len(metricIDs) < 500 {
|
||||||
// It is cheaper to skip pre-fetching and obtain metricNames inline.
|
// It is cheaper to skip pre-fetching and obtain metricNames inline.
|
||||||
|
@ -905,9 +909,11 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error {
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
is := idb.getIndexSearch()
|
is := idb.getIndexSearch()
|
||||||
defer idb.putIndexSearch(is)
|
defer idb.putIndexSearch(is)
|
||||||
for _, metricID := range metricIDs {
|
for loops, metricID := range metricIDs {
|
||||||
tsid := tsidsMap[metricID]
|
if loops&(1<<10) == 0 {
|
||||||
metricName, err = is.searchMetricName(metricName[:0], metricID, tsid.AccountID, tsid.ProjectID)
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
}
|
||||||
|
metricName, err = is.searchMetricName(metricName[:0], metricID, accountID, projectID)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err)
|
return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Search limits the pace of search calls when there is at least a single in-flight assisted merge.
|
// Search limits the pace of search calls when there is at least a single in-flight assisted merge.
|
||||||
|
//
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
|
||||||
var Search = pacelimiter.New()
|
var Search = pacelimiter.New()
|
||||||
|
|
||||||
// BigMerges limits the pace for big merges when there is at least a single in-flight small merge.
|
// BigMerges limits the pace for big merges when there is at least a single in-flight small merge.
|
||||||
|
//
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
|
||||||
var BigMerges = pacelimiter.New()
|
var BigMerges = pacelimiter.New()
|
||||||
|
|
Loading…
Reference in a new issue