diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 70e585043c..ff0ecc7b5f 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -115,7 +115,7 @@ func timeseriesWorker(workerID uint) { } } -// RunParallel runs in parallel f for all the results from rss. +// RunParallel runs f in parallel for all the results from rss. // // f shouldn't hold references to rs after returning. // workerID is the id of the worker goroutine that calls f. diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index a480b494fa..bf63561d22 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -20,6 +20,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" @@ -783,10 +784,15 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string mp := &is.mp mp.Reset() dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) prefix := kb.B ts.Seek(prefix) for len(tks) < maxTagKeys && ts.NextItem() { + if loopsPaceLimiter&(1<<16) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { break @@ -855,11 +861,16 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri mp := &is.mp mp.Reset() dmis := is.db.getDeletedMetricIDs() + loopsPaceLimiter := 0 kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) kb.B = marshalTagValue(kb.B, tagKey) prefix := kb.B ts.Seek(prefix) for len(tvs) < maxTagValues && ts.NextItem() { + if loopsPaceLimiter&(1<<16) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { break @@ -923,12 +934,17 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro ts := &is.ts kb := &is.kb mp := &is.mp + loopsPaceLimiter := 0 var metricIDsLen uint64 // Extract the number of series from ((__name__=value): metricIDs) rows kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID) kb.B = marshalTagValue(kb.B, nil) ts.Seek(kb.B) for ts.NextItem() { + if loopsPaceLimiter&(1<<16) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, kb.B) { break @@ -989,11 +1005,16 @@ func (is *indexSearch) getTSDBStatusForDate(accountID, projectID uint32, date ui var labelValueCountByLabelName, seriesCountByLabelValuePair uint64 nameEqualBytes := []byte("__name__=") + loopsPaceLimiter := 0 kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID) kb.B = encoding.MarshalUint64(kb.B, date) prefix := kb.B ts.Seek(prefix) for ts.NextItem() { + if loopsPaceLimiter&(1<<16) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { break @@ -1523,7 +1544,10 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics // Obtain TSID values for the given metricIDs. tsids := make([]TSID, len(metricIDs)) i := 0 - for _, metricID := range metricIDs { + for loopsPaceLimiter, metricID := range metricIDs { + if loopsPaceLimiter&(1<<10) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } // Try obtaining TSIDs from MetricID->TSID cache. This is much faster // than scanning the mergeset if it contains a lot of metricIDs. tsid := &tsids[i] @@ -1590,7 +1614,10 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs defer kbPool.Put(metricName) mn := GetMetricName() defer PutMetricName(mn) - for _, metricID := range sortedMetricIDs { + for loopsPaceLimiter, metricID := range sortedMetricIDs { + if loopsPaceLimiter&(1<<10) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } var err error metricName.B, err = is.searchMetricName(metricName.B[:0], metricID, accountID, projectID) if err != nil { @@ -2063,16 +2090,21 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, } // Scan all the rows with tf.prefix and call f on every tf match. - loops := 0 ts := &is.ts kb := &is.kb mp := &is.mp mp.Reset() var prevMatchingSuffix []byte var prevMatch bool + loops := 0 + loopsPaceLimiter := 0 prefix := tf.prefix ts.Seek(prefix) for ts.NextItem() { + if loopsPaceLimiter&(1<<14) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { return nil @@ -2188,8 +2220,13 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr mp.Reset() maxLoops := maxMetrics * maxIndexScanLoopsPerMetric loops := 0 + loopsPaceLimiter := 0 ts.Seek(prefix) for metricIDs.Len() < maxMetrics && ts.NextItem() { + if loopsPaceLimiter&(1<<16) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { return nil @@ -2221,10 +2258,15 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri mp.Reset() maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric loops := 0 + loopsPaceLimiter := 0 ts.Seek(prefix) var sf []uint64 var metricID uint64 for ts.NextItem() { + if loopsPaceLimiter&(1<<12) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { return nil @@ -2768,8 +2810,13 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, p func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error { ts := &is.ts mp := &is.mp + loopsPaceLimiter := 0 ts.Seek(prefix) for ts.NextItem() { + if loopsPaceLimiter&(1<<16) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { return nil diff --git a/lib/storage/search.go b/lib/storage/search.go index 3a2929c913..082d70f347 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) // BlockRef references a Block. @@ -131,6 +132,8 @@ type Search struct { err error needClosing bool + + loops int } func (s *Search) reset() { @@ -141,6 +144,7 @@ func (s *Search) reset() { s.ts.reset() s.err = nil s.needClosing = false + s.loops = 0 } // Init initializes s from the given storage, tfss and tr. @@ -194,6 +198,10 @@ func (s *Search) NextMetricBlock() bool { return false } for s.ts.NextBlock() { + if s.loops&(1<<10) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + s.loops++ tsid := &s.ts.BlockRef.bh.TSID var err error s.MetricBlockRef.MetricName, err = s.storage.searchMetricName(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 7d1f8fbade..431800d1f7 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -863,11 +863,6 @@ func nextRetentionDuration(retentionMonths int) time.Duration { // searchTSIDs returns sorted TSIDs for the given tfss and the given tr. func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { - // Make sure that there are enough resources for processing data ingestion before starting the query. - // This should prevent from data ingestion starvation when provessing heavy queries. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 . - storagepacelimiter.Search.WaitIfNeeded() - // Do not cache tfss -> tsids here, since the caching is performed // on idb level. tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics) @@ -879,18 +874,27 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) // prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. // +// It is expected that all the tsdis have the same (accountID, projectID) +// // This should speed-up further searchMetricName calls for metricIDs from tsids. func (s *Storage) prefetchMetricNames(tsids []TSID) error { + if len(tsids) == 0 { + return nil + } + accountID := tsids[0].AccountID + projectID := tsids[0].ProjectID var metricIDs uint64Sorter - tsidsMap := make(map[uint64]*TSID) prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) for i := range tsids { - metricID := tsids[i].MetricID + tsid := &tsids[i] + if tsid.AccountID != accountID || tsid.ProjectID != projectID { + logger.Panicf("BUG: unexpected (accountID, projectID) in tsid=%#v; want accountID=%d, projectID=%d", tsid, accountID, projectID) + } + metricID := tsid.MetricID if prefetchedMetricIDs.Has(metricID) { continue } metricIDs = append(metricIDs, metricID) - tsidsMap[metricID] = &tsids[i] } if len(metricIDs) < 500 { // It is cheaper to skip pre-fetching and obtain metricNames inline. @@ -905,9 +909,11 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error { idb := s.idb() is := idb.getIndexSearch() defer idb.putIndexSearch(is) - for _, metricID := range metricIDs { - tsid := tsidsMap[metricID] - metricName, err = is.searchMetricName(metricName[:0], metricID, tsid.AccountID, tsid.ProjectID) + for loops, metricID := range metricIDs { + if loops&(1<<10) == 0 { + storagepacelimiter.Search.WaitIfNeeded() + } + metricName, err = is.searchMetricName(metricName[:0], metricID, accountID, projectID) if err != nil && err != io.EOF { return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err) } diff --git a/lib/storagepacelimiter/storagepacelimiter.go b/lib/storagepacelimiter/storagepacelimiter.go index 1ca5860a10..4eda4d95f9 100644 --- a/lib/storagepacelimiter/storagepacelimiter.go +++ b/lib/storagepacelimiter/storagepacelimiter.go @@ -5,7 +5,11 @@ import ( ) // Search limits the pace of search calls when there is at least a single in-flight assisted merge. +// +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 var Search = pacelimiter.New() // BigMerges limits the pace for big merges when there is at least a single in-flight small merge. +// +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 var BigMerges = pacelimiter.New()