From fb3d1380ac50d19251738778dd194e654b88dae4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 23 Jul 2020 20:42:57 +0300 Subject: [PATCH] lib/storage: respect `-search.maxQueryDuration` when searching for time series in inverted index Previously the time spent on inverted index search could exceed the configured `-search.maxQueryDuration`. This commit stops searching in inverted index on query timeout. --- app/vmselect/netstorage/netstorage.go | 45 +++++++++---- app/vmstorage/transport/server.go | 39 +++++++---- lib/storage/index_db.go | 96 +++++++++++++++++---------- lib/storage/index_db_test.go | 44 ++++++------ lib/storage/index_db_timing_test.go | 8 +-- lib/storage/search.go | 25 +++++-- lib/storage/search_test.go | 2 +- lib/storage/storage.go | 40 ++++++----- lib/storage/storage_test.go | 12 ++-- 9 files changed, 193 insertions(+), 118 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index ff0ecc7b5f..5fb321c1f9 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1047,10 +1047,10 @@ func (sn *storageNode) deleteMetrics(requestData []byte, deadline Deadline) (int deletedCount += n return nil } - if err := sn.execOnConn("deleteMetrics_v2", f, deadline); err != nil { + if err := sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil { // Try again before giving up. // There is no need in zeroing deletedCount. - if err = sn.execOnConn("deleteMetrics_v2", f, deadline); err != nil { + if err = sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil { return deletedCount, err } } @@ -1067,10 +1067,10 @@ func (sn *storageNode) getLabels(accountID, projectID uint32, deadline Deadline) labels = ls return nil } - if err := sn.execOnConn("labels", f, deadline); err != nil { + if err := sn.execOnConn("labels_v2", f, deadline); err != nil { // Try again before giving up. labels = nil - if err = sn.execOnConn("labels", f, deadline); err != nil { + if err = sn.execOnConn("labels_v2", f, deadline); err != nil { return nil, err } } @@ -1087,10 +1087,10 @@ func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName str labelValues = lvs return nil } - if err := sn.execOnConn("labelValues", f, deadline); err != nil { + if err := sn.execOnConn("labelValues_v2", f, deadline); err != nil { // Try again before giving up. labelValues = nil - if err = sn.execOnConn("labelValues", f, deadline); err != nil { + if err = sn.execOnConn("labelValues_v2", f, deadline); err != nil { return nil, err } } @@ -1107,10 +1107,10 @@ func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Dea tagEntries = tes return nil } - if err := sn.execOnConn("labelEntries", f, deadline); err != nil { + if err := sn.execOnConn("labelEntries_v2", f, deadline); err != nil { // Try again before giving up. tagEntries = nil - if err = sn.execOnConn("labelEntries", f, deadline); err != nil { + if err = sn.execOnConn("labelEntries_v2", f, deadline); err != nil { return nil, err } } @@ -1127,10 +1127,10 @@ func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date ui status = st return nil } - if err := sn.execOnConn("tsdbStatus", f, deadline); err != nil { + if err := sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil { // Try again before giving up. status = nil - if err = sn.execOnConn("tsdbStatus", f, deadline); err != nil { + if err = sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil { return nil, err } } @@ -1147,10 +1147,10 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead n = nn return nil } - if err := sn.execOnConn("seriesCount", f, deadline); err != nil { + if err := sn.execOnConn("seriesCount_v2", f, deadline); err != nil { // Try again before giving up. n = 0 - if err = sn.execOnConn("seriesCount", f, deadline); err != nil { + if err = sn.execOnConn("seriesCount_v2", f, deadline); err != nil { return 0, err } } @@ -1167,9 +1167,9 @@ func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestDat blocksRead = n return nil } - if err := sn.execOnConn("search_v3", f, deadline); err != nil && blocksRead == 0 { + if err := sn.execOnConn("search_v4", f, deadline); err != nil && blocksRead == 0 { // Try again before giving up if zero blocks read on the previous attempt. - if err = sn.execOnConn("search_v3", f, deadline); err != nil { + if err = sn.execOnConn("search_v4", f, deadline); err != nil { return err } } @@ -1202,6 +1202,23 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC return fmt.Errorf("cannot send rpcName=%q to the server: %w", rpcName, err) } + // Send the remaining timeout instead of deadline to remote server, since it may have different time. + now := fasttime.UnixTimestamp() + timeout := uint64(0) + if deadline.deadline > now { + timeout = deadline.deadline - now + } + if timeout > (1<<32)-2 { + timeout = (1 << 32) - 2 + } + timeout++ + if err := writeUint32(bc, uint32(timeout)); err != nil { + // Close the connection instead of returning it to the pool, + // since it may be broken. + _ = bc.Close() + return fmt.Errorf("cannot send timeout=%d for rpcName=%q to the server: %w", timeout, rpcName, err) + } + if err := f(bc); err != nil { remoteAddr := bc.RemoteAddr() var er *errRemote diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index c12abea5d0..8df77c5575 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -398,6 +398,9 @@ type vmselectRequestCtx struct { tfss []*storage.TagFilters sr storage.Search mb storage.MetricBlock + + // deadline in unix timestamp seconds for the current request. + deadline uint64 } func (ctx *vmselectRequestCtx) readUint32() (uint32, error) { @@ -502,6 +505,7 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error { } return fmt.Errorf("cannot read rpcName: %w", err) } + rpcName := string(ctx.dataBuf) // Limit the time required for reading request args. if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { @@ -511,20 +515,27 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error { _ = ctx.bc.SetReadDeadline(zeroTime) }() - switch string(ctx.dataBuf) { - case "search_v3": + // Read the timeout for request execution. + timeout, err := ctx.readUint32() + if err != nil { + return fmt.Errorf("cannot read timeout for the request %q: %w", rpcName, err) + } + ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout) + + switch rpcName { + case "search_v4": return s.processVMSelectSearchQuery(ctx) - case "labelValues": + case "labelValues_v2": return s.processVMSelectLabelValues(ctx) - case "labelEntries": + case "labelEntries_v2": return s.processVMSelectLabelEntries(ctx) - case "labels": + case "labels_v2": return s.processVMSelectLabels(ctx) - case "seriesCount": + case "seriesCount_v2": return s.processVMSelectSeriesCount(ctx) - case "tsdbStatus": + case "tsdbStatus_v2": return s.processVMSelectTSDBStatus(ctx) - case "deleteMetrics_v2": + case "deleteMetrics_v3": return s.processVMSelectDeleteMetrics(ctx) default: return fmt.Errorf("unsupported rpcName: %q", ctx.dataBuf) @@ -584,7 +595,7 @@ func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error { } // Search for tag keys - labels, err := s.storage.SearchTagKeys(accountID, projectID, *maxTagKeysPerSearch) + labels, err := s.storage.SearchTagKeys(accountID, projectID, *maxTagKeysPerSearch, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -632,7 +643,7 @@ func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error { labelName := ctx.dataBuf // Search for tag values - labelValues, err := s.storage.SearchTagValues(accountID, projectID, labelName, *maxTagValuesPerSearch) + labelValues, err := s.storage.SearchTagValues(accountID, projectID, labelName, *maxTagValuesPerSearch, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -676,7 +687,7 @@ func (s *Server) processVMSelectLabelEntries(ctx *vmselectRequestCtx) error { } // Perform the request - labelEntries, err := s.storage.SearchTagEntries(accountID, projectID, *maxTagKeysPerSearch, *maxTagValuesPerSearch) + labelEntries, err := s.storage.SearchTagEntries(accountID, projectID, *maxTagKeysPerSearch, *maxTagValuesPerSearch, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -723,7 +734,7 @@ func (s *Server) processVMSelectSeriesCount(ctx *vmselectRequestCtx) error { } // Execute the request - n, err := s.storage.GetSeriesCount(accountID, projectID) + n, err := s.storage.GetSeriesCount(accountID, projectID, ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -762,7 +773,7 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error { } // Execute the request - status, err := s.storage.GetTSDBStatusForDate(accountID, projectID, uint64(date), int(topN)) + status, err := s.storage.GetTSDBStatusForDate(accountID, projectID, uint64(date), int(topN), ctx.deadline) if err != nil { return ctx.writeErrorMessage(err) } @@ -833,7 +844,7 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error { if err := checkTimeRange(s.storage, tr); err != nil { return ctx.writeErrorMessage(err) } - ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch) + ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch, ctx.deadline) defer ctx.sr.MustClose() if err := ctx.sr.Error(); err != nil { return ctx.writeErrorMessage(err) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index bf63561d22..474a9fee61 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -20,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" @@ -202,7 +201,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working prevHourMetricIDs: prevHourMetricIDs, } - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) dmis, err := is.loadDeletedMetricIDs() db.putIndexSearch(is) if err != nil { @@ -210,7 +209,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working } db.setDeletedMetricIDs(dmis) - is = db.getIndexSearch() + is = db.getIndexSearch(noDeadline) date, err := is.getStartDateForPerDayInvertedIndex() db.putIndexSearch(is) if err != nil { @@ -221,6 +220,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working return db, nil } +const noDeadline = 1<<64 - 1 + // IndexDBMetrics contains essential metrics for indexDB. type IndexDBMetrics struct { TagCacheSize uint64 @@ -525,7 +526,7 @@ var tagFiltersKeyGen uint64 // // It returns io.EOF if the given mn isn't found locally. func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error { - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) err := is.getTSIDByMetricName(dst, metricName) db.putIndexSearch(is) if err == nil { @@ -548,6 +549,9 @@ type indexSearch struct { kb bytesutil.ByteBuffer mp tagToMetricIDsRowParser + // deadline in unix timestamp seconds for the given search. + deadline uint64 + // tsidByNameMisses and tsidByNameSkips is used for a performance // hack in GetOrCreateTSIDByName. See the comment there. tsidByNameMisses int @@ -586,7 +590,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error return nil } -func (db *indexDB) getIndexSearch() *indexSearch { +func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch { v := db.indexSearchPool.Get() if v == nil { v = &indexSearch{ @@ -595,6 +599,7 @@ func (db *indexDB) getIndexSearch() *indexSearch { } is := v.(*indexSearch) is.ts.Init(db.tb, shouldCacheBlock) + is.deadline = deadline return is } @@ -602,6 +607,7 @@ func (db *indexDB) putIndexSearch(is *indexSearch) { is.ts.MustClose() is.kb.Reset() is.mp.Reset() + is.deadline = 0 // Do not reset tsidByNameMisses and tsidByNameSkips, // since they are used in GetOrCreateTSIDByName across call boundaries. @@ -747,12 +753,12 @@ func putIndexItems(ii *indexItems) { var indexItemsPool sync.Pool // SearchTagKeys returns all the tag keys for the given accountID, projectID. -func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([]string, error) { +func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) { // TODO: cache results? tks := make(map[string]struct{}) - is := db.getIndexSearch() + is := db.getIndexSearch(deadline) err := is.searchTagKeys(accountID, projectID, tks, maxTagKeys) db.putIndexSearch(is) if err != nil { @@ -760,7 +766,7 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([ } ok := db.doExtDB(func(extDB *indexDB) { - is := extDB.getIndexSearch() + is := extDB.getIndexSearch(deadline) err = is.searchTagKeys(accountID, projectID, tks, maxTagKeys) extDB.putIndexSearch(is) }) @@ -790,7 +796,9 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string ts.Seek(prefix) for len(tks) < maxTagKeys && ts.NextItem() { if loopsPaceLimiter&(1<<16) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } loopsPaceLimiter++ item := ts.Item @@ -822,18 +830,18 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string } // SearchTagValues returns all the tag values for the given tagKey -func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int) ([]string, error) { +func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { // TODO: cache results? tvs := make(map[string]struct{}) - is := db.getIndexSearch() + is := db.getIndexSearch(deadline) err := is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues) db.putIndexSearch(is) if err != nil { return nil, err } ok := db.doExtDB(func(extDB *indexDB) { - is := extDB.getIndexSearch() + is := extDB.getIndexSearch(deadline) err = is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues) extDB.putIndexSearch(is) }) @@ -868,7 +876,9 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri ts.Seek(prefix) for len(tvs) < maxTagValues && ts.NextItem() { if loopsPaceLimiter&(1<<16) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } loopsPaceLimiter++ item := ts.Item @@ -910,8 +920,8 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri // // It includes the deleted series too and may count the same series // up to two times - in db and extDB. -func (db *indexDB) GetSeriesCount(accountID, projectID uint32) (uint64, error) { - is := db.getIndexSearch() +func (db *indexDB) GetSeriesCount(accountID, projectID uint32, deadline uint64) (uint64, error) { + is := db.getIndexSearch(deadline) n, err := is.getSeriesCount(accountID, projectID) db.putIndexSearch(is) if err != nil { @@ -920,7 +930,7 @@ func (db *indexDB) GetSeriesCount(accountID, projectID uint32) (uint64, error) { var nExt uint64 ok := db.doExtDB(func(extDB *indexDB) { - is := extDB.getIndexSearch() + is := extDB.getIndexSearch(deadline) nExt, err = is.getSeriesCount(accountID, projectID) extDB.putIndexSearch(is) }) @@ -942,7 +952,9 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro ts.Seek(kb.B) for ts.NextItem() { if loopsPaceLimiter&(1<<16) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return 0, err + } } loopsPaceLimiter++ item := ts.Item @@ -970,8 +982,8 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro } // GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID. -func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) { - is := db.getIndexSearch() +func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { + is := db.getIndexSearch(deadline) status, err := is.getTSDBStatusForDate(accountID, projectID, date, topN) db.putIndexSearch(is) if err != nil { @@ -984,7 +996,7 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64 // The entries weren't found in the db. Try searching them in extDB. ok := db.doExtDB(func(extDB *indexDB) { - is := extDB.getIndexSearch() + is := extDB.getIndexSearch(deadline) status, err = is.getTSDBStatusForDate(accountID, projectID, date, topN) extDB.putIndexSearch(is) }) @@ -1012,7 +1024,9 @@ func (is *indexSearch) getTSDBStatusForDate(accountID, projectID uint32, date ui ts.Seek(prefix) for ts.NextItem() { if loopsPaceLimiter&(1<<16) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return nil, err + } } loopsPaceLimiter++ item := ts.Item @@ -1164,7 +1178,7 @@ func (th *topHeap) Pop() interface{} { // searchMetricName appends metric name for the given metricID to dst // and returns the result. func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, projectID uint32) ([]byte, error) { - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) dst, err := is.searchMetricName(dst, metricID, accountID, projectID) db.putIndexSearch(is) @@ -1174,7 +1188,7 @@ func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, proj // Try searching in the external indexDB. if db.doExtDB(func(extDB *indexDB) { - is := extDB.getIndexSearch() + is := extDB.getIndexSearch(noDeadline) dst, err = is.searchMetricName(dst, metricID, accountID, projectID) extDB.putIndexSearch(is) }) { @@ -1209,7 +1223,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { MinTimestamp: 0, MaxTimestamp: (1 << 63) - 1, } - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9) db.putIndexSearch(is) if err != nil { @@ -1349,7 +1363,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { } // searchTSIDs returns sorted tsids matching the given tfss over the given tr. -func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { +func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { if len(tfss) == 0 { return nil, nil } @@ -1365,7 +1379,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) } // Slow path - search for tsids in the db and extDB. - is := db.getIndexSearch() + is := db.getIndexSearch(deadline) localTSIDs, err := is.searchTSIDs(tfss, tr, maxMetrics) db.putIndexSearch(is) if err != nil { @@ -1384,7 +1398,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) extTSIDs = tsids return } - is := extDB.getIndexSearch() + is := extDB.getIndexSearch(deadline) extTSIDs, err = is.searchTSIDs(tfss, tr, maxMetrics) extDB.putIndexSearch(is) @@ -1546,7 +1560,9 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics i := 0 for loopsPaceLimiter, metricID := range metricIDs { if loopsPaceLimiter&(1<<10) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return nil, err + } } // Try obtaining TSIDs from MetricID->TSID cache. This is much faster // than scanning the mergeset if it contains a lot of metricIDs. @@ -1616,7 +1632,9 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs defer PutMetricName(mn) for loopsPaceLimiter, metricID := range sortedMetricIDs { if loopsPaceLimiter&(1<<10) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } var err error metricName.B, err = is.searchMetricName(metricName.B[:0], metricID, accountID, projectID) @@ -2102,7 +2120,9 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, ts.Seek(prefix) for ts.NextItem() { if loopsPaceLimiter&(1<<14) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } loopsPaceLimiter++ item := ts.Item @@ -2224,7 +2244,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr ts.Seek(prefix) for metricIDs.Len() < maxMetrics && ts.NextItem() { if loopsPaceLimiter&(1<<16) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } loopsPaceLimiter++ item := ts.Item @@ -2264,7 +2286,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri var metricID uint64 for ts.NextItem() { if loopsPaceLimiter&(1<<12) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } loopsPaceLimiter++ item := ts.Item @@ -2374,7 +2398,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac wg.Add(1) go func(date uint64) { defer wg.Done() - isLocal := is.db.getIndexSearch() + isLocal := is.db.getIndexSearch(is.deadline) defer is.db.putIndexSearch(isLocal) m, err := isLocal.getMetricIDsForDate(date, accountID, projectID, maxMetrics) mu.Lock() @@ -2431,7 +2455,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set wg.Add(1) go func(date uint64) { defer wg.Done() - isLocal := is.db.getIndexSearch() + isLocal := is.db.getIndexSearch(is.deadline) defer is.db.putIndexSearch(isLocal) m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics) mu.Lock() @@ -2814,7 +2838,9 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64 ts.Seek(prefix) for ts.NextItem() { if loopsPaceLimiter&(1<<16) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } loopsPaceLimiter++ item := ts.Item diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index cc57999d10..cf72e47a5d 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -629,7 +629,7 @@ func testIndexDBBigMetricName(db *indexDB) error { var mn MetricName var tsid TSID - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) // Try creating too big metric group @@ -690,7 +690,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, var mns []MetricName var tsids []TSID - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) var metricNameBuf []byte @@ -807,7 +807,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Test SearchTagValues - tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, nil, 1e5) + tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, nil, 1e5, noDeadline) if err != nil { return fmt.Errorf("error in SearchTagValues for __name__: %w", err) } @@ -821,7 +821,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } for i := range mn.Tags { tag := &mn.Tags[i] - tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, tag.Key, 1e5) + tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, tag.Key, 1e5, noDeadline) if err != nil { return fmt.Errorf("error in SearchTagValues for __name__: %w", err) } @@ -834,7 +834,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC // Test SearchTagKeys for k, apKeys := range allKeys { - tks, err := db.SearchTagKeys(k.AccountID, k.ProjectID, 1e5) + tks, err := db.SearchTagKeys(k.AccountID, k.ProjectID, 1e5, noDeadline) if err != nil { return fmt.Errorf("error in SearchTagKeys: %w", err) } @@ -853,7 +853,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC // would return more timeseries than needed. if !isConcurrent { for k, tc := range timeseriesCounters { - n, err := db.GetSeriesCount(k.AccountID, k.ProjectID) + n, err := db.GetSeriesCount(k.AccountID, k.ProjectID, noDeadline) if err != nil { return fmt.Errorf("unexpected error in GetSeriesCount(%v): %w", k, err) } @@ -885,7 +885,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, true, false); err != nil { return fmt.Errorf("cannot add no-op negative filter: %w", err) } - tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by exact tag filter: %w", err) } @@ -894,7 +894,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Verify tag cache. - tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by exact tag filter: %w", err) } @@ -906,7 +906,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil { return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err) } @@ -927,7 +927,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if tfsNew := tfs.Finalize(); len(tfsNew) > 0 { return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err) } @@ -955,7 +955,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, true, true); err != nil { return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by regexp tag filter: %w", err) } @@ -965,7 +965,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil { return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err) } @@ -981,7 +981,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil { return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by non-existing tag filter: %w", err) } @@ -997,7 +997,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC // Search with empty filter. It should match all the results for (accountID, projectID). tfs.Reset(mn.AccountID, mn.ProjectID) - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for common prefix: %w", err) } @@ -1010,7 +1010,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, false, false); err != nil { return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for empty metricGroup: %w", err) } @@ -1027,7 +1027,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil { return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for empty metricGroup: %w", err) } @@ -1036,7 +1036,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Verify empty tfss - tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5) + tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for nil tfss: %w", err) } @@ -1570,7 +1570,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } }() - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) // Create a bunch of per-day time series @@ -1658,7 +1658,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { MinTimestamp: int64(now - msecPerHour + 1), MaxTimestamp: int64(now), } - matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) + matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline) if err != nil { t.Fatalf("error searching tsids: %v", err) } @@ -1672,7 +1672,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { MinTimestamp: int64(now - 2*msecPerHour - 1), MaxTimestamp: int64(now), } - matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) + matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline) if err != nil { t.Fatalf("error searching tsids: %v", err) } @@ -1686,7 +1686,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { MaxTimestamp: int64(now), } - matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000) + matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline) if err != nil { t.Fatalf("error searching tsids: %v", err) } @@ -1695,7 +1695,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Check GetTSDBStatusForDate - status, err := db.GetTSDBStatusForDate(accountID, projectID, baseDate, 5) + status, err := db.GetTSDBStatusForDate(accountID, projectID, baseDate, 5, noDeadline) if err != nil { t.Fatalf("error in GetTSDBStatusForDate: %s", err) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 09efcfeb39..d219f62512 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -98,7 +98,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) { var metricName []byte - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) for i := 0; i < recordsPerLoop; i++ { mn.MetricGroup = strconv.AppendUint(mn.MetricGroup[:0], uint64(i+startOffset), 10) @@ -177,7 +177,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { b.ResetTimer() benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) { - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) tfss := []*TagFilters{tfs} tr := TimeRange{ @@ -344,7 +344,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { var tsid TSID var metricName []byte - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) for i := 0; i < recordsCount; i++ { mn.AccountID = uint32(i % accountsCount) @@ -363,7 +363,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { var tsidLocal TSID var metricNameLocal []byte mnLocal := mn - is := db.getIndexSearch() + is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) for pb.Next() { for i := 0; i < recordsPerLoop; i++ { diff --git a/lib/storage/search.go b/lib/storage/search.go index 082d70f347..7cb22c8c50 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) @@ -129,6 +130,9 @@ type Search struct { ts tableSearch + // deadline in unix timestamp seconds for the current search. + deadline uint64 + err error needClosing bool @@ -142,6 +146,7 @@ func (s *Search) reset() { s.storage = nil s.ts.reset() + s.deadline = 0 s.err = nil s.needClosing = false s.loops = 0 @@ -150,17 +155,18 @@ func (s *Search) reset() { // Init initializes s from the given storage, tfss and tr. // // MustClose must be called when the search is done. -func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) { +func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) { if s.needClosing { logger.Panicf("BUG: missing MustClose call before the next call to Init") } s.reset() + s.deadline = deadline s.needClosing = true - tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics) + tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics, deadline) if err == nil { - err = storage.prefetchMetricNames(tsids) + err = storage.prefetchMetricNames(tsids, deadline) } // It is ok to call Init on error from storage.searchTSIDs. // Init must be called before returning because it will fail @@ -199,7 +205,10 @@ func (s *Search) NextMetricBlock() bool { } for s.ts.NextBlock() { if s.loops&(1<<10) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(s.deadline); err != nil { + s.err = err + return false + } } s.loops++ tsid := &s.ts.BlockRef.bh.TSID @@ -401,3 +410,11 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) { return src, nil } + +func checkSearchDeadlineAndPace(deadline uint64) error { + if fasttime.UnixTimestamp() > deadline { + return errDeadlineExceeded + } + storagepacelimiter.Search.WaitIfNeeded() + return nil +} diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index d5b781f8a6..5585c90ca8 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -225,7 +225,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun } // Search - s.Init(st, []*TagFilters{tfs}, tr, 1e5) + s.Init(st, []*TagFilters{tfs}, tr, 1e5, noDeadline) var mbs []metricBlock for s.NextMetricBlock() { var b Block diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 431800d1f7..51aa24bc31 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -862,10 +862,10 @@ func nextRetentionDuration(retentionMonths int) time.Duration { } // searchTSIDs returns sorted TSIDs for the given tfss and the given tr. -func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) { +func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { // Do not cache tfss -> tsids here, since the caching is performed // on idb level. - tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics) + tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics, deadline) if err != nil { return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err) } @@ -877,7 +877,7 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) // It is expected that all the tsdis have the same (accountID, projectID) // // This should speed-up further searchMetricName calls for metricIDs from tsids. -func (s *Storage) prefetchMetricNames(tsids []TSID) error { +func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error { if len(tsids) == 0 { return nil } @@ -907,11 +907,13 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error { var metricName []byte var err error idb := s.idb() - is := idb.getIndexSearch() + is := idb.getIndexSearch(deadline) defer idb.putIndexSearch(is) for loops, metricID := range metricIDs { if loops&(1<<10) == 0 { - storagepacelimiter.Search.WaitIfNeeded() + if err := checkSearchDeadlineAndPace(is.deadline); err != nil { + return err + } } metricName, err = is.searchMetricName(metricName[:0], metricID, accountID, projectID) if err != nil && err != io.EOF { @@ -926,6 +928,8 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error { return nil } +var errDeadlineExceeded = fmt.Errorf("deadline exceeded") + // DeleteMetrics deletes all the metrics matching the given tfss. // // Returns the number of metrics deleted. @@ -950,19 +954,19 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64, accountID, proje } // SearchTagKeys searches for tag keys for the given (accountID, projectID). -func (s *Storage) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([]string, error) { - return s.idb().SearchTagKeys(accountID, projectID, maxTagKeys) +func (s *Storage) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) { + return s.idb().SearchTagKeys(accountID, projectID, maxTagKeys, deadline) } // SearchTagValues searches for tag values for the given tagKey in (accountID, projectID). -func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int) ([]string, error) { - return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues) +func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) { + return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues, deadline) } // SearchTagEntries returns a list of (tagName -> tagValues) for (accountID, projectID). -func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxTagValues int) ([]TagEntry, error) { +func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) { idb := s.idb() - keys, err := idb.SearchTagKeys(accountID, projectID, maxTagKeys) + keys, err := idb.SearchTagKeys(accountID, projectID, maxTagKeys, deadline) if err != nil { return nil, fmt.Errorf("cannot search tag keys: %w", err) } @@ -972,7 +976,7 @@ func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxT tes := make([]TagEntry, len(keys)) for i, key := range keys { - values, err := idb.SearchTagValues(accountID, projectID, []byte(key), maxTagValues) + values, err := idb.SearchTagValues(accountID, projectID, []byte(key), maxTagValues, deadline) if err != nil { return nil, fmt.Errorf("cannot search values for tag %q: %w", key, err) } @@ -996,15 +1000,15 @@ type TagEntry struct { // // It includes the deleted series too and may count the same series // up to two times - in db and extDB. -func (s *Storage) GetSeriesCount(accountID, projectID uint32) (uint64, error) { - return s.idb().GetSeriesCount(accountID, projectID) +func (s *Storage) GetSeriesCount(accountID, projectID uint32, deadline uint64) (uint64, error) { + return s.idb().GetSeriesCount(accountID, projectID, deadline) } // GetTSDBStatusForDate returns TSDB status data for /api/v1/status/tsdb for the given (accountID, projectID). // // See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats -func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) { - return s.idb().GetTSDBStatusForDate(accountID, projectID, date, topN) +func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) { + return s.idb().GetTSDBStatusForDate(accountID, projectID, date, topN, deadline) } // MetricRow is a metric to insert into storage. @@ -1206,7 +1210,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra sort.Slice(pendingMetricRows, func(i, j int) bool { return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName) }) - is := idb.getIndexSearch() + is := idb.getIndexSearch(noDeadline) prevMetricNameRaw = nil for i := range pendingMetricRows { pmr := &pendingMetricRows[i] @@ -1431,7 +1435,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { return a.metricID < b.metricID }) idb := s.idb() - is := idb.getIndexSearch() + is := idb.getIndexSearch(noDeadline) defer idb.putIndexSearch(is) var firstError error prevMetricID = 0 diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 10b9fe9cf2..0f9bbb3fdc 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -489,7 +489,7 @@ func TestStorageDeleteMetrics(t *testing.T) { } // Verify no tag keys exist - tks, err := s.SearchTagKeys(0, 0, 1e5) + tks, err := s.SearchTagKeys(0, 0, 1e5, noDeadline) if err != nil { t.Fatalf("error in SearchTagKeys at the start: %s", err) } @@ -540,7 +540,7 @@ func TestStorageDeleteMetrics(t *testing.T) { }) // Verify no more tag keys exist - tks, err = s.SearchTagKeys(0, 0, 1e5) + tks, err = s.SearchTagKeys(0, 0, 1e5, noDeadline) if err != nil { t.Fatalf("error in SearchTagKeys after the test: %s", err) } @@ -600,7 +600,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { s.debugFlush() // Verify tag values exist - tvs, err := s.SearchTagValues(accountID, projectID, workerTag, 1e5) + tvs, err := s.SearchTagValues(accountID, projectID, workerTag, 1e5, noDeadline) if err != nil { return fmt.Errorf("error in SearchTagValues before metrics removal: %w", err) } @@ -609,7 +609,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { } // Verify tag keys exist - tks, err := s.SearchTagKeys(accountID, projectID, 1e5) + tks, err := s.SearchTagKeys(accountID, projectID, 1e5, noDeadline) if err != nil { return fmt.Errorf("error in SearchTagKeys before metrics removal: %w", err) } @@ -625,7 +625,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { metricBlocksCount := func(tfs *TagFilters) int { // Verify the number of blocks n := 0 - sr.Init(s, []*TagFilters{tfs}, tr, 1e5) + sr.Init(s, []*TagFilters{tfs}, tr, 1e5, noDeadline) for sr.NextMetricBlock() { n++ } @@ -673,7 +673,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { if n := metricBlocksCount(tfs); n != 0 { return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n) } - tvs, err = s.SearchTagValues(accountID, projectID, workerTag, 1e5) + tvs, err = s.SearchTagValues(accountID, projectID, workerTag, 1e5, noDeadline) if err != nil { return fmt.Errorf("error in SearchTagValues after all the metrics are removed: %w", err) }