lib/storage: respect -search.maxQueryDuration when searching for time series in inverted index

Previously the time spent on inverted index search could exceed the configured `-search.maxQueryDuration`.
This commit stops searching in inverted index on query timeout.
This commit is contained in:
Aliaksandr Valialkin 2020-07-23 20:42:57 +03:00
parent 2a45871823
commit 039c9d2441
9 changed files with 153 additions and 106 deletions

View file

@ -426,7 +426,7 @@ func GetLabels(deadline Deadline) ([]string, error) {
if deadline.Exceeded() { if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
} }
labels, err := vmstorage.SearchTagKeys(*maxTagKeysPerSearch) labels, err := vmstorage.SearchTagKeys(*maxTagKeysPerSearch, deadline.deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("error during labels search: %w", err) return nil, fmt.Errorf("error during labels search: %w", err)
} }
@ -455,7 +455,7 @@ func GetLabelValues(labelName string, deadline Deadline) ([]string, error) {
} }
// Search for tag values // Search for tag values
labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch) labelValues, err := vmstorage.SearchTagValues([]byte(labelName), *maxTagValuesPerSearch, deadline.deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err) return nil, fmt.Errorf("error during label values search for labelName=%q: %w", labelName, err)
} }
@ -471,7 +471,7 @@ func GetLabelEntries(deadline Deadline) ([]storage.TagEntry, error) {
if deadline.Exceeded() { if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
} }
labelEntries, err := vmstorage.SearchTagEntries(*maxTagKeysPerSearch, *maxTagValuesPerSearch) labelEntries, err := vmstorage.SearchTagEntries(*maxTagKeysPerSearch, *maxTagValuesPerSearch, deadline.deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("error during label entries request: %w", err) return nil, fmt.Errorf("error during label entries request: %w", err)
} }
@ -501,7 +501,7 @@ func GetTSDBStatusForDate(deadline Deadline, date uint64, topN int) (*storage.TS
if deadline.Exceeded() { if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
} }
status, err := vmstorage.GetTSDBStatusForDate(date, topN) status, err := vmstorage.GetTSDBStatusForDate(date, topN, deadline.deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("error during tsdb status request: %w", err) return nil, fmt.Errorf("error during tsdb status request: %w", err)
} }
@ -513,7 +513,7 @@ func GetSeriesCount(deadline Deadline) (uint64, error) {
if deadline.Exceeded() { if deadline.Exceeded() {
return 0, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) return 0, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
} }
n, err := vmstorage.GetSeriesCount() n, err := vmstorage.GetSeriesCount(deadline.deadline)
if err != nil { if err != nil {
return 0, fmt.Errorf("error during series count request: %w", err) return 0, fmt.Errorf("error during series count request: %w", err)
} }
@ -560,7 +560,7 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli
defer vmstorage.WG.Done() defer vmstorage.WG.Done()
sr := getStorageSearch() sr := getStorageSearch()
sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch) sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.deadline)
m := make(map[string][]storage.BlockRef) m := make(map[string][]storage.BlockRef)
var orderedMetricNames []string var orderedMetricNames []string

View file

@ -117,41 +117,41 @@ func DeleteMetrics(tfss []*storage.TagFilters) (int, error) {
} }
// SearchTagKeys searches for tag keys // SearchTagKeys searches for tag keys
func SearchTagKeys(maxTagKeys int) ([]string, error) { func SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
WG.Add(1) WG.Add(1)
keys, err := Storage.SearchTagKeys(maxTagKeys) keys, err := Storage.SearchTagKeys(maxTagKeys, deadline)
WG.Done() WG.Done()
return keys, err return keys, err
} }
// SearchTagValues searches for tag values for the given tagKey // SearchTagValues searches for tag values for the given tagKey
func SearchTagValues(tagKey []byte, maxTagValues int) ([]string, error) { func SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
WG.Add(1) WG.Add(1)
values, err := Storage.SearchTagValues(tagKey, maxTagValues) values, err := Storage.SearchTagValues(tagKey, maxTagValues, deadline)
WG.Done() WG.Done()
return values, err return values, err
} }
// SearchTagEntries searches for tag entries. // SearchTagEntries searches for tag entries.
func SearchTagEntries(maxTagKeys, maxTagValues int) ([]storage.TagEntry, error) { func SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]storage.TagEntry, error) {
WG.Add(1) WG.Add(1)
tagEntries, err := Storage.SearchTagEntries(maxTagKeys, maxTagValues) tagEntries, err := Storage.SearchTagEntries(maxTagKeys, maxTagValues, deadline)
WG.Done() WG.Done()
return tagEntries, err return tagEntries, err
} }
// GetTSDBStatusForDate returns TSDB status for the given date. // GetTSDBStatusForDate returns TSDB status for the given date.
func GetTSDBStatusForDate(date uint64, topN int) (*storage.TSDBStatus, error) { func GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (*storage.TSDBStatus, error) {
WG.Add(1) WG.Add(1)
status, err := Storage.GetTSDBStatusForDate(date, topN) status, err := Storage.GetTSDBStatusForDate(date, topN, deadline)
WG.Done() WG.Done()
return status, err return status, err
} }
// GetSeriesCount returns the number of time series in the storage. // GetSeriesCount returns the number of time series in the storage.
func GetSeriesCount() (uint64, error) { func GetSeriesCount(deadline uint64) (uint64, error) {
WG.Add(1) WG.Add(1)
n, err := Storage.GetSeriesCount() n, err := Storage.GetSeriesCount(deadline)
WG.Done() WG.Done()
return n, err return n, err
} }

View file

@ -20,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
@ -202,7 +201,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
prevHourMetricIDs: prevHourMetricIDs, prevHourMetricIDs: prevHourMetricIDs,
} }
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
dmis, err := is.loadDeletedMetricIDs() dmis, err := is.loadDeletedMetricIDs()
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
@ -210,7 +209,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
} }
db.setDeletedMetricIDs(dmis) db.setDeletedMetricIDs(dmis)
is = db.getIndexSearch() is = db.getIndexSearch(noDeadline)
date, err := is.getStartDateForPerDayInvertedIndex() date, err := is.getStartDateForPerDayInvertedIndex()
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
@ -221,6 +220,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
return db, nil return db, nil
} }
const noDeadline = 1<<64 - 1
// IndexDBMetrics contains essential metrics for indexDB. // IndexDBMetrics contains essential metrics for indexDB.
type IndexDBMetrics struct { type IndexDBMetrics struct {
TagCacheSize uint64 TagCacheSize uint64
@ -512,7 +513,7 @@ var tagFiltersKeyGen uint64
// //
// It returns io.EOF if the given mn isn't found locally. // It returns io.EOF if the given mn isn't found locally.
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error { func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
err := is.getTSIDByMetricName(dst, metricName) err := is.getTSIDByMetricName(dst, metricName)
db.putIndexSearch(is) db.putIndexSearch(is)
if err == nil { if err == nil {
@ -535,6 +536,9 @@ type indexSearch struct {
kb bytesutil.ByteBuffer kb bytesutil.ByteBuffer
mp tagToMetricIDsRowParser mp tagToMetricIDsRowParser
// deadline in unix timestamp seconds for the given search.
deadline uint64
// tsidByNameMisses and tsidByNameSkips is used for a performance // tsidByNameMisses and tsidByNameSkips is used for a performance
// hack in GetOrCreateTSIDByName. See the comment there. // hack in GetOrCreateTSIDByName. See the comment there.
tsidByNameMisses int tsidByNameMisses int
@ -573,7 +577,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error
return nil return nil
} }
func (db *indexDB) getIndexSearch() *indexSearch { func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
v := db.indexSearchPool.Get() v := db.indexSearchPool.Get()
if v == nil { if v == nil {
v = &indexSearch{ v = &indexSearch{
@ -582,6 +586,7 @@ func (db *indexDB) getIndexSearch() *indexSearch {
} }
is := v.(*indexSearch) is := v.(*indexSearch)
is.ts.Init(db.tb, shouldCacheBlock) is.ts.Init(db.tb, shouldCacheBlock)
is.deadline = deadline
return is return is
} }
@ -589,6 +594,7 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
is.ts.MustClose() is.ts.MustClose()
is.kb.Reset() is.kb.Reset()
is.mp.Reset() is.mp.Reset()
is.deadline = 0
// Do not reset tsidByNameMisses and tsidByNameSkips, // Do not reset tsidByNameMisses and tsidByNameSkips,
// since they are used in GetOrCreateTSIDByName across call boundaries. // since they are used in GetOrCreateTSIDByName across call boundaries.
@ -732,12 +738,12 @@ func putIndexItems(ii *indexItems) {
var indexItemsPool sync.Pool var indexItemsPool sync.Pool
// SearchTagKeys returns all the tag keys. // SearchTagKeys returns all the tag keys.
func (db *indexDB) SearchTagKeys(maxTagKeys int) ([]string, error) { func (db *indexDB) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
// TODO: cache results? // TODO: cache results?
tks := make(map[string]struct{}) tks := make(map[string]struct{})
is := db.getIndexSearch() is := db.getIndexSearch(deadline)
err := is.searchTagKeys(tks, maxTagKeys) err := is.searchTagKeys(tks, maxTagKeys)
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
@ -745,7 +751,7 @@ func (db *indexDB) SearchTagKeys(maxTagKeys int) ([]string, error) {
} }
ok := db.doExtDB(func(extDB *indexDB) { ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch() is := extDB.getIndexSearch(deadline)
err = is.searchTagKeys(tks, maxTagKeys) err = is.searchTagKeys(tks, maxTagKeys)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
}) })
@ -775,7 +781,9 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
ts.Seek(prefix) ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() { for len(tks) < maxTagKeys && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item
@ -807,18 +815,18 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
} }
// SearchTagValues returns all the tag values for the given tagKey // SearchTagValues returns all the tag values for the given tagKey
func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int) ([]string, error) { func (db *indexDB) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
// TODO: cache results? // TODO: cache results?
tvs := make(map[string]struct{}) tvs := make(map[string]struct{})
is := db.getIndexSearch() is := db.getIndexSearch(deadline)
err := is.searchTagValues(tvs, tagKey, maxTagValues) err := is.searchTagValues(tvs, tagKey, maxTagValues)
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ok := db.doExtDB(func(extDB *indexDB) { ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch() is := extDB.getIndexSearch(deadline)
err = is.searchTagValues(tvs, tagKey, maxTagValues) err = is.searchTagValues(tvs, tagKey, maxTagValues)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
}) })
@ -853,7 +861,9 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
ts.Seek(prefix) ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() { for len(tvs) < maxTagValues && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item
@ -895,8 +905,8 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
// //
// It includes the deleted series too and may count the same series // It includes the deleted series too and may count the same series
// up to two times - in db and extDB. // up to two times - in db and extDB.
func (db *indexDB) GetSeriesCount() (uint64, error) { func (db *indexDB) GetSeriesCount(deadline uint64) (uint64, error) {
is := db.getIndexSearch() is := db.getIndexSearch(deadline)
n, err := is.getSeriesCount() n, err := is.getSeriesCount()
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
@ -905,7 +915,7 @@ func (db *indexDB) GetSeriesCount() (uint64, error) {
var nExt uint64 var nExt uint64
ok := db.doExtDB(func(extDB *indexDB) { ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch() is := extDB.getIndexSearch(deadline)
nExt, err = is.getSeriesCount() nExt, err = is.getSeriesCount()
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
}) })
@ -927,7 +937,9 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
ts.Seek(kb.B) ts.Seek(kb.B)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return 0, err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item
@ -955,8 +967,8 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
} }
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date. // GetTSDBStatusForDate returns topN entries for tsdb status for the given date.
func (db *indexDB) GetTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, error) { func (db *indexDB) GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch() is := db.getIndexSearch(deadline)
status, err := is.getTSDBStatusForDate(date, topN) status, err := is.getTSDBStatusForDate(date, topN)
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
@ -969,7 +981,7 @@ func (db *indexDB) GetTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, err
// The entries weren't found in the db. Try searching them in extDB. // The entries weren't found in the db. Try searching them in extDB.
ok := db.doExtDB(func(extDB *indexDB) { ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch() is := extDB.getIndexSearch(deadline)
status, err = is.getTSDBStatusForDate(date, topN) status, err = is.getTSDBStatusForDate(date, topN)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
}) })
@ -997,7 +1009,9 @@ func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus,
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item
@ -1149,7 +1163,7 @@ func (th *topHeap) Pop() interface{} {
// searchMetricName appends metric name for the given metricID to dst // searchMetricName appends metric name for the given metricID to dst
// and returns the result. // and returns the result.
func (db *indexDB) searchMetricName(dst []byte, metricID uint64) ([]byte, error) { func (db *indexDB) searchMetricName(dst []byte, metricID uint64) ([]byte, error) {
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
dst, err := is.searchMetricName(dst, metricID) dst, err := is.searchMetricName(dst, metricID)
db.putIndexSearch(is) db.putIndexSearch(is)
@ -1159,7 +1173,7 @@ func (db *indexDB) searchMetricName(dst []byte, metricID uint64) ([]byte, error)
// Try searching in the external indexDB. // Try searching in the external indexDB.
if db.doExtDB(func(extDB *indexDB) { if db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch() is := extDB.getIndexSearch(noDeadline)
dst, err = is.searchMetricName(dst, metricID) dst, err = is.searchMetricName(dst, metricID)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
}) { }) {
@ -1194,7 +1208,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
MinTimestamp: 0, MinTimestamp: 0,
MaxTimestamp: (1 << 63) - 1, MaxTimestamp: (1 << 63) - 1,
} }
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9) metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9)
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
@ -1324,7 +1338,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
} }
// searchTSIDs returns sorted tsids matching the given tfss over the given tr. // searchTSIDs returns sorted tsids matching the given tfss over the given tr.
func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
if len(tfss) == 0 { if len(tfss) == 0 {
return nil, nil return nil, nil
} }
@ -1340,7 +1354,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
} }
// Slow path - search for tsids in the db and extDB. // Slow path - search for tsids in the db and extDB.
is := db.getIndexSearch() is := db.getIndexSearch(deadline)
localTSIDs, err := is.searchTSIDs(tfss, tr, maxMetrics) localTSIDs, err := is.searchTSIDs(tfss, tr, maxMetrics)
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
@ -1359,7 +1373,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
extTSIDs = tsids extTSIDs = tsids
return return
} }
is := extDB.getIndexSearch() is := extDB.getIndexSearch(deadline)
extTSIDs, err = is.searchTSIDs(tfss, tr, maxMetrics) extTSIDs, err = is.searchTSIDs(tfss, tr, maxMetrics)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
@ -1519,7 +1533,9 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
i := 0 i := 0
for loopsPaceLimiter, metricID := range metricIDs { for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&(1<<10) == 0 { if loopsPaceLimiter&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
} }
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster // Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs. // than scanning the mergeset if it contains a lot of metricIDs.
@ -1589,7 +1605,9 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
defer PutMetricName(mn) defer PutMetricName(mn)
for loopsPaceLimiter, metricID := range sortedMetricIDs { for loopsPaceLimiter, metricID := range sortedMetricIDs {
if loopsPaceLimiter&(1<<10) == 0 { if loopsPaceLimiter&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
var err error var err error
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID) metricName.B, err = is.searchMetricName(metricName.B[:0], metricID)
@ -2075,7 +2093,9 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<14) == 0 { if loopsPaceLimiter&(1<<14) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item
@ -2197,7 +2217,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
ts.Seek(prefix) ts.Seek(prefix)
for metricIDs.Len() < maxMetrics && ts.NextItem() { for metricIDs.Len() < maxMetrics && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item
@ -2237,7 +2259,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
var metricID uint64 var metricID uint64
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<12) == 0 { if loopsPaceLimiter&(1<<12) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item
@ -2347,7 +2371,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
wg.Add(1) wg.Add(1)
go func(date uint64) { go func(date uint64) {
defer wg.Done() defer wg.Done()
isLocal := is.db.getIndexSearch() isLocal := is.db.getIndexSearch(is.deadline)
defer is.db.putIndexSearch(isLocal) defer is.db.putIndexSearch(isLocal)
m, err := isLocal.getMetricIDsForDate(date, maxMetrics) m, err := isLocal.getMetricIDsForDate(date, maxMetrics)
mu.Lock() mu.Lock()
@ -2404,7 +2428,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
wg.Add(1) wg.Add(1)
go func(date uint64) { go func(date uint64) {
defer wg.Done() defer wg.Done()
isLocal := is.db.getIndexSearch() isLocal := is.db.getIndexSearch(is.deadline)
defer is.db.putIndexSearch(isLocal) defer is.db.putIndexSearch(isLocal)
m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics) m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
mu.Lock() mu.Lock()
@ -2776,7 +2800,9 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<16) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
loopsPaceLimiter++ loopsPaceLimiter++
item := ts.Item item := ts.Item

View file

@ -611,7 +611,7 @@ func testIndexDBBigMetricName(db *indexDB) error {
var mn MetricName var mn MetricName
var tsid TSID var tsid TSID
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
// Try creating too big metric group // Try creating too big metric group
@ -672,7 +672,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
var mns []MetricName var mns []MetricName
var tsids []TSID var tsids []TSID
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
var metricNameBuf []byte var metricNameBuf []byte
@ -773,7 +773,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
} }
// Test SearchTagValues // Test SearchTagValues
tvs, err := db.SearchTagValues(nil, 1e5) tvs, err := db.SearchTagValues(nil, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues for __name__: %w", err) return fmt.Errorf("error in SearchTagValues for __name__: %w", err)
} }
@ -782,7 +782,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
} }
for i := range mn.Tags { for i := range mn.Tags {
tag := &mn.Tags[i] tag := &mn.Tags[i]
tvs, err := db.SearchTagValues(tag.Key, 1e5) tvs, err := db.SearchTagValues(tag.Key, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues for __name__: %w", err) return fmt.Errorf("error in SearchTagValues for __name__: %w", err)
} }
@ -794,7 +794,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
} }
// Test SearchTagKeys // Test SearchTagKeys
tks, err := db.SearchTagKeys(1e5) tks, err := db.SearchTagKeys(1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagKeys: %w", err) return fmt.Errorf("error in SearchTagKeys: %w", err)
} }
@ -811,7 +811,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
// Concurrent test may create duplicate timeseries, so GetSeriesCount // Concurrent test may create duplicate timeseries, so GetSeriesCount
// would return more timeseries than needed. // would return more timeseries than needed.
if !isConcurrent { if !isConcurrent {
n, err := db.GetSeriesCount() n, err := db.GetSeriesCount(noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("unexpected error in GetSeriesCount(): %w", err) return fmt.Errorf("unexpected error in GetSeriesCount(): %w", err)
} }
@ -842,7 +842,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, false); err != nil { if err := tfs.Add(nil, nil, true, false); err != nil {
return fmt.Errorf("cannot add no-op negative filter: %w", err) return fmt.Errorf("cannot add no-op negative filter: %w", err)
} }
tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err) return fmt.Errorf("cannot search by exact tag filter: %w", err)
} }
@ -851,7 +851,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
} }
// Verify tag cache. // Verify tag cache.
tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err) return fmt.Errorf("cannot search by exact tag filter: %w", err)
} }
@ -863,7 +863,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil { if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
} }
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err) return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
} }
@ -884,7 +884,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if tfsNew := tfs.Finalize(); len(tfsNew) > 0 { if tfsNew := tfs.Finalize(); len(tfsNew) > 0 {
return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew) return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew)
} }
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err) return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
} }
@ -912,7 +912,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, true); err != nil { if err := tfs.Add(nil, nil, true, true); err != nil {
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err) return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
} }
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by regexp tag filter: %w", err) return fmt.Errorf("cannot search by regexp tag filter: %w", err)
} }
@ -922,7 +922,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil { if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
} }
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err) return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
} }
@ -938,7 +938,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil { if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err) return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
} }
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by non-existing tag filter: %w", err) return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
} }
@ -954,7 +954,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
// Search with empty filter. It should match all the results. // Search with empty filter. It should match all the results.
tfs.Reset() tfs.Reset()
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for common prefix: %w", err) return fmt.Errorf("cannot search for common prefix: %w", err)
} }
@ -967,7 +967,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, false, false); err != nil { if err := tfs.Add(nil, nil, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err) return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
} }
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err) return fmt.Errorf("cannot search for empty metricGroup: %w", err)
} }
@ -984,7 +984,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil { if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err) return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
} }
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err) return fmt.Errorf("cannot search for empty metricGroup: %w", err)
} }
@ -993,7 +993,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
} }
// Verify empty tfss // Verify empty tfss
tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5) tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for nil tfss: %w", err) return fmt.Errorf("cannot search for nil tfss: %w", err)
} }
@ -1501,7 +1501,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
} }
}() }()
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
// Create a bunch of per-day time series // Create a bunch of per-day time series
@ -1573,7 +1573,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MinTimestamp: int64(now - msecPerHour + 1), MinTimestamp: int64(now - msecPerHour + 1),
MaxTimestamp: int64(now), MaxTimestamp: int64(now),
} }
matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error searching tsids: %v", err) t.Fatalf("error searching tsids: %v", err)
} }
@ -1587,7 +1587,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MinTimestamp: int64(now - 2*msecPerHour - 1), MinTimestamp: int64(now - 2*msecPerHour - 1),
MaxTimestamp: int64(now), MaxTimestamp: int64(now),
} }
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error searching tsids: %v", err) t.Fatalf("error searching tsids: %v", err)
} }
@ -1601,7 +1601,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MaxTimestamp: int64(now), MaxTimestamp: int64(now),
} }
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error searching tsids: %v", err) t.Fatalf("error searching tsids: %v", err)
} }
@ -1610,7 +1610,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
} }
// Check GetTSDBStatusForDate // Check GetTSDBStatusForDate
status, err := db.GetTSDBStatusForDate(baseDate, 5) status, err := db.GetTSDBStatusForDate(baseDate, 5, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error in GetTSDBStatusForDate: %s", err) t.Fatalf("error in GetTSDBStatusForDate: %s", err)
} }

View file

@ -95,7 +95,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) { func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
var metricName []byte var metricName []byte
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
for i := 0; i < recordsPerLoop; i++ { for i := 0; i < recordsPerLoop; i++ {
mn.MetricGroup = strconv.AppendUint(mn.MetricGroup[:0], uint64(i+startOffset), 10) mn.MetricGroup = strconv.AppendUint(mn.MetricGroup[:0], uint64(i+startOffset), 10)
@ -170,7 +170,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
b.ResetTimer() b.ResetTimer()
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) { benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
tfss := []*TagFilters{tfs} tfss := []*TagFilters{tfs}
tr := TimeRange{ tr := TimeRange{
@ -335,7 +335,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
var tsid TSID var tsid TSID
var metricName []byte var metricName []byte
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
for i := 0; i < recordsCount; i++ { for i := 0; i < recordsCount; i++ {
mn.sortTags() mn.sortTags()
@ -352,7 +352,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
var tsidLocal TSID var tsidLocal TSID
var metricNameLocal []byte var metricNameLocal []byte
mnLocal := mn mnLocal := mn
is := db.getIndexSearch() is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
for pb.Next() { for pb.Next() {
for i := 0; i < recordsPerLoop; i++ { for i := 0; i < recordsPerLoop; i++ {

View file

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
) )
@ -64,6 +65,9 @@ type Search struct {
ts tableSearch ts tableSearch
// deadline in unix timestamp seconds for the current search.
deadline uint64
err error err error
needClosing bool needClosing bool
@ -77,6 +81,7 @@ func (s *Search) reset() {
s.storage = nil s.storage = nil
s.ts.reset() s.ts.reset()
s.deadline = 0
s.err = nil s.err = nil
s.needClosing = false s.needClosing = false
s.loops = 0 s.loops = 0
@ -85,17 +90,18 @@ func (s *Search) reset() {
// Init initializes s from the given storage, tfss and tr. // Init initializes s from the given storage, tfss and tr.
// //
// MustClose must be called when the search is done. // MustClose must be called when the search is done.
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) { func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) {
if s.needClosing { if s.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init") logger.Panicf("BUG: missing MustClose call before the next call to Init")
} }
s.reset() s.reset()
s.deadline = deadline
s.needClosing = true s.needClosing = true
tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics) tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics, deadline)
if err == nil { if err == nil {
err = storage.prefetchMetricNames(tsids) err = storage.prefetchMetricNames(tsids, deadline)
} }
// It is ok to call Init on error from storage.searchTSIDs. // It is ok to call Init on error from storage.searchTSIDs.
// Init must be called before returning because it will fail // Init must be called before returning because it will fail
@ -134,7 +140,10 @@ func (s *Search) NextMetricBlock() bool {
} }
for s.ts.NextBlock() { for s.ts.NextBlock() {
if s.loops&(1<<10) == 0 { if s.loops&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(s.deadline); err != nil {
s.err = err
return false
}
} }
s.loops++ s.loops++
tsid := &s.ts.BlockRef.bh.TSID tsid := &s.ts.BlockRef.bh.TSID
@ -320,3 +329,11 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
return src, nil return src, nil
} }
func checkSearchDeadlineAndPace(deadline uint64) error {
if fasttime.UnixTimestamp() > deadline {
return errDeadlineExceeded
}
storagepacelimiter.Search.WaitIfNeeded()
return nil
}

View file

@ -218,7 +218,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun
} }
// Search // Search
s.Init(st, []*TagFilters{tfs}, tr, 1e5) s.Init(st, []*TagFilters{tfs}, tr, 1e5, noDeadline)
var mbs []metricBlock var mbs []metricBlock
for s.NextMetricBlock() { for s.NextMetricBlock() {
var b Block var b Block

View file

@ -799,10 +799,10 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
} }
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr. // searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
// Do not cache tfss -> tsids here, since the caching is performed // Do not cache tfss -> tsids here, since the caching is performed
// on idb level. // on idb level.
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics) tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics, deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err) return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err)
} }
@ -812,7 +812,7 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. // prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
// //
// This should speed-up further searchMetricName calls for metricIDs from tsids. // This should speed-up further searchMetricName calls for metricIDs from tsids.
func (s *Storage) prefetchMetricNames(tsids []TSID) error { func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
if len(tsids) == 0 { if len(tsids) == 0 {
return nil return nil
} }
@ -837,11 +837,13 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error {
var metricName []byte var metricName []byte
var err error var err error
idb := s.idb() idb := s.idb()
is := idb.getIndexSearch() is := idb.getIndexSearch(deadline)
defer idb.putIndexSearch(is) defer idb.putIndexSearch(is)
for loops, metricID := range metricIDs { for loops, metricID := range metricIDs {
if loops&(1<<10) == 0 { if loops&(1<<10) == 0 {
storagepacelimiter.Search.WaitIfNeeded() if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
} }
metricName, err = is.searchMetricName(metricName[:0], metricID) metricName, err = is.searchMetricName(metricName[:0], metricID)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
@ -856,6 +858,8 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error {
return nil return nil
} }
var errDeadlineExceeded = fmt.Errorf("deadline exceeded")
// DeleteMetrics deletes all the metrics matching the given tfss. // DeleteMetrics deletes all the metrics matching the given tfss.
// //
// Returns the number of metrics deleted. // Returns the number of metrics deleted.
@ -880,19 +884,19 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64) ([]byte, error)
} }
// SearchTagKeys searches for tag keys // SearchTagKeys searches for tag keys
func (s *Storage) SearchTagKeys(maxTagKeys int) ([]string, error) { func (s *Storage) SearchTagKeys(maxTagKeys int, deadline uint64) ([]string, error) {
return s.idb().SearchTagKeys(maxTagKeys) return s.idb().SearchTagKeys(maxTagKeys, deadline)
} }
// SearchTagValues searches for tag values for the given tagKey // SearchTagValues searches for tag values for the given tagKey
func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int) ([]string, error) { func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
return s.idb().SearchTagValues(tagKey, maxTagValues) return s.idb().SearchTagValues(tagKey, maxTagValues, deadline)
} }
// SearchTagEntries returns a list of (tagName -> tagValues) // SearchTagEntries returns a list of (tagName -> tagValues)
func (s *Storage) SearchTagEntries(maxTagKeys, maxTagValues int) ([]TagEntry, error) { func (s *Storage) SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) {
idb := s.idb() idb := s.idb()
keys, err := idb.SearchTagKeys(maxTagKeys) keys, err := idb.SearchTagKeys(maxTagKeys, deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot search tag keys: %w", err) return nil, fmt.Errorf("cannot search tag keys: %w", err)
} }
@ -902,7 +906,7 @@ func (s *Storage) SearchTagEntries(maxTagKeys, maxTagValues int) ([]TagEntry, er
tes := make([]TagEntry, len(keys)) tes := make([]TagEntry, len(keys))
for i, key := range keys { for i, key := range keys {
values, err := idb.SearchTagValues([]byte(key), maxTagValues) values, err := idb.SearchTagValues([]byte(key), maxTagValues, deadline)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot search values for tag %q: %w", key, err) return nil, fmt.Errorf("cannot search values for tag %q: %w", key, err)
} }
@ -926,15 +930,15 @@ type TagEntry struct {
// //
// It includes the deleted series too and may count the same series // It includes the deleted series too and may count the same series
// up to two times - in db and extDB. // up to two times - in db and extDB.
func (s *Storage) GetSeriesCount() (uint64, error) { func (s *Storage) GetSeriesCount(deadline uint64) (uint64, error) {
return s.idb().GetSeriesCount() return s.idb().GetSeriesCount(deadline)
} }
// GetTSDBStatusForDate returns TSDB status data for /api/v1/status/tsdb. // GetTSDBStatusForDate returns TSDB status data for /api/v1/status/tsdb.
// //
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats // See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func (s *Storage) GetTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, error) { func (s *Storage) GetTSDBStatusForDate(date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
return s.idb().GetTSDBStatusForDate(date, topN) return s.idb().GetTSDBStatusForDate(date, topN, deadline)
} }
// MetricRow is a metric to insert into storage. // MetricRow is a metric to insert into storage.
@ -1131,7 +1135,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
sort.Slice(pendingMetricRows, func(i, j int) bool { sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName) return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
}) })
is := idb.getIndexSearch() is := idb.getIndexSearch(noDeadline)
prevMetricNameRaw = nil prevMetricNameRaw = nil
for i := range pendingMetricRows { for i := range pendingMetricRows {
pmr := &pendingMetricRows[i] pmr := &pendingMetricRows[i]
@ -1339,7 +1343,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
return a.metricID < b.metricID return a.metricID < b.metricID
}) })
idb := s.idb() idb := s.idb()
is := idb.getIndexSearch() is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is) defer idb.putIndexSearch(is)
var firstError error var firstError error
prevMetricID = 0 prevMetricID = 0

View file

@ -453,7 +453,7 @@ func TestStorageDeleteMetrics(t *testing.T) {
} }
// Verify no tag keys exist // Verify no tag keys exist
tks, err := s.SearchTagKeys(1e5) tks, err := s.SearchTagKeys(1e5, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error in SearchTagKeys at the start: %s", err) t.Fatalf("error in SearchTagKeys at the start: %s", err)
} }
@ -504,7 +504,7 @@ func TestStorageDeleteMetrics(t *testing.T) {
}) })
// Verify no more tag keys exist // Verify no more tag keys exist
tks, err = s.SearchTagKeys(1e5) tks, err = s.SearchTagKeys(1e5, noDeadline)
if err != nil { if err != nil {
t.Fatalf("error in SearchTagKeys after the test: %s", err) t.Fatalf("error in SearchTagKeys after the test: %s", err)
} }
@ -560,7 +560,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
s.debugFlush() s.debugFlush()
// Verify tag values exist // Verify tag values exist
tvs, err := s.SearchTagValues(workerTag, 1e5) tvs, err := s.SearchTagValues(workerTag, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues before metrics removal: %w", err) return fmt.Errorf("error in SearchTagValues before metrics removal: %w", err)
} }
@ -569,7 +569,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
} }
// Verify tag keys exist // Verify tag keys exist
tks, err := s.SearchTagKeys(1e5) tks, err := s.SearchTagKeys(1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagKeys before metrics removal: %w", err) return fmt.Errorf("error in SearchTagKeys before metrics removal: %w", err)
} }
@ -585,7 +585,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
metricBlocksCount := func(tfs *TagFilters) int { metricBlocksCount := func(tfs *TagFilters) int {
// Verify the number of blocks // Verify the number of blocks
n := 0 n := 0
sr.Init(s, []*TagFilters{tfs}, tr, 1e5) sr.Init(s, []*TagFilters{tfs}, tr, 1e5, noDeadline)
for sr.NextMetricBlock() { for sr.NextMetricBlock() {
n++ n++
} }
@ -633,7 +633,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
if n := metricBlocksCount(tfs); n != 0 { if n := metricBlocksCount(tfs); n != 0 {
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n) return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
} }
tvs, err = s.SearchTagValues(workerTag, 1e5) tvs, err = s.SearchTagValues(workerTag, 1e5, noDeadline)
if err != nil { if err != nil {
return fmt.Errorf("error in SearchTagValues after all the metrics are removed: %w", err) return fmt.Errorf("error in SearchTagValues after all the metrics are removed: %w", err)
} }