mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: respect -search.maxQueryDuration
when searching for time series in inverted index
Previously the time spent on inverted index search could exceed the configured `-search.maxQueryDuration`. This commit stops searching in inverted index on query timeout.
This commit is contained in:
parent
dbf3038637
commit
fb3d1380ac
9 changed files with 193 additions and 118 deletions
|
@ -1047,10 +1047,10 @@ func (sn *storageNode) deleteMetrics(requestData []byte, deadline Deadline) (int
|
||||||
deletedCount += n
|
deletedCount += n
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := sn.execOnConn("deleteMetrics_v2", f, deadline); err != nil {
|
if err := sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil {
|
||||||
// Try again before giving up.
|
// Try again before giving up.
|
||||||
// There is no need in zeroing deletedCount.
|
// There is no need in zeroing deletedCount.
|
||||||
if err = sn.execOnConn("deleteMetrics_v2", f, deadline); err != nil {
|
if err = sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil {
|
||||||
return deletedCount, err
|
return deletedCount, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1067,10 +1067,10 @@ func (sn *storageNode) getLabels(accountID, projectID uint32, deadline Deadline)
|
||||||
labels = ls
|
labels = ls
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := sn.execOnConn("labels", f, deadline); err != nil {
|
if err := sn.execOnConn("labels_v2", f, deadline); err != nil {
|
||||||
// Try again before giving up.
|
// Try again before giving up.
|
||||||
labels = nil
|
labels = nil
|
||||||
if err = sn.execOnConn("labels", f, deadline); err != nil {
|
if err = sn.execOnConn("labels_v2", f, deadline); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1087,10 +1087,10 @@ func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName str
|
||||||
labelValues = lvs
|
labelValues = lvs
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := sn.execOnConn("labelValues", f, deadline); err != nil {
|
if err := sn.execOnConn("labelValues_v2", f, deadline); err != nil {
|
||||||
// Try again before giving up.
|
// Try again before giving up.
|
||||||
labelValues = nil
|
labelValues = nil
|
||||||
if err = sn.execOnConn("labelValues", f, deadline); err != nil {
|
if err = sn.execOnConn("labelValues_v2", f, deadline); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1107,10 +1107,10 @@ func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Dea
|
||||||
tagEntries = tes
|
tagEntries = tes
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := sn.execOnConn("labelEntries", f, deadline); err != nil {
|
if err := sn.execOnConn("labelEntries_v2", f, deadline); err != nil {
|
||||||
// Try again before giving up.
|
// Try again before giving up.
|
||||||
tagEntries = nil
|
tagEntries = nil
|
||||||
if err = sn.execOnConn("labelEntries", f, deadline); err != nil {
|
if err = sn.execOnConn("labelEntries_v2", f, deadline); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1127,10 +1127,10 @@ func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date ui
|
||||||
status = st
|
status = st
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := sn.execOnConn("tsdbStatus", f, deadline); err != nil {
|
if err := sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil {
|
||||||
// Try again before giving up.
|
// Try again before giving up.
|
||||||
status = nil
|
status = nil
|
||||||
if err = sn.execOnConn("tsdbStatus", f, deadline); err != nil {
|
if err = sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1147,10 +1147,10 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead
|
||||||
n = nn
|
n = nn
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := sn.execOnConn("seriesCount", f, deadline); err != nil {
|
if err := sn.execOnConn("seriesCount_v2", f, deadline); err != nil {
|
||||||
// Try again before giving up.
|
// Try again before giving up.
|
||||||
n = 0
|
n = 0
|
||||||
if err = sn.execOnConn("seriesCount", f, deadline); err != nil {
|
if err = sn.execOnConn("seriesCount_v2", f, deadline); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1167,9 +1167,9 @@ func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestDat
|
||||||
blocksRead = n
|
blocksRead = n
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := sn.execOnConn("search_v3", f, deadline); err != nil && blocksRead == 0 {
|
if err := sn.execOnConn("search_v4", f, deadline); err != nil && blocksRead == 0 {
|
||||||
// Try again before giving up if zero blocks read on the previous attempt.
|
// Try again before giving up if zero blocks read on the previous attempt.
|
||||||
if err = sn.execOnConn("search_v3", f, deadline); err != nil {
|
if err = sn.execOnConn("search_v4", f, deadline); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1202,6 +1202,23 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
|
||||||
return fmt.Errorf("cannot send rpcName=%q to the server: %w", rpcName, err)
|
return fmt.Errorf("cannot send rpcName=%q to the server: %w", rpcName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
|
||||||
|
now := fasttime.UnixTimestamp()
|
||||||
|
timeout := uint64(0)
|
||||||
|
if deadline.deadline > now {
|
||||||
|
timeout = deadline.deadline - now
|
||||||
|
}
|
||||||
|
if timeout > (1<<32)-2 {
|
||||||
|
timeout = (1 << 32) - 2
|
||||||
|
}
|
||||||
|
timeout++
|
||||||
|
if err := writeUint32(bc, uint32(timeout)); err != nil {
|
||||||
|
// Close the connection instead of returning it to the pool,
|
||||||
|
// since it may be broken.
|
||||||
|
_ = bc.Close()
|
||||||
|
return fmt.Errorf("cannot send timeout=%d for rpcName=%q to the server: %w", timeout, rpcName, err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := f(bc); err != nil {
|
if err := f(bc); err != nil {
|
||||||
remoteAddr := bc.RemoteAddr()
|
remoteAddr := bc.RemoteAddr()
|
||||||
var er *errRemote
|
var er *errRemote
|
||||||
|
|
|
@ -398,6 +398,9 @@ type vmselectRequestCtx struct {
|
||||||
tfss []*storage.TagFilters
|
tfss []*storage.TagFilters
|
||||||
sr storage.Search
|
sr storage.Search
|
||||||
mb storage.MetricBlock
|
mb storage.MetricBlock
|
||||||
|
|
||||||
|
// deadline in unix timestamp seconds for the current request.
|
||||||
|
deadline uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
|
func (ctx *vmselectRequestCtx) readUint32() (uint32, error) {
|
||||||
|
@ -502,6 +505,7 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
|
||||||
}
|
}
|
||||||
return fmt.Errorf("cannot read rpcName: %w", err)
|
return fmt.Errorf("cannot read rpcName: %w", err)
|
||||||
}
|
}
|
||||||
|
rpcName := string(ctx.dataBuf)
|
||||||
|
|
||||||
// Limit the time required for reading request args.
|
// Limit the time required for reading request args.
|
||||||
if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
|
if err := ctx.bc.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
|
||||||
|
@ -511,20 +515,27 @@ func (s *Server) processVMSelectRequest(ctx *vmselectRequestCtx) error {
|
||||||
_ = ctx.bc.SetReadDeadline(zeroTime)
|
_ = ctx.bc.SetReadDeadline(zeroTime)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
switch string(ctx.dataBuf) {
|
// Read the timeout for request execution.
|
||||||
case "search_v3":
|
timeout, err := ctx.readUint32()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read timeout for the request %q: %w", rpcName, err)
|
||||||
|
}
|
||||||
|
ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout)
|
||||||
|
|
||||||
|
switch rpcName {
|
||||||
|
case "search_v4":
|
||||||
return s.processVMSelectSearchQuery(ctx)
|
return s.processVMSelectSearchQuery(ctx)
|
||||||
case "labelValues":
|
case "labelValues_v2":
|
||||||
return s.processVMSelectLabelValues(ctx)
|
return s.processVMSelectLabelValues(ctx)
|
||||||
case "labelEntries":
|
case "labelEntries_v2":
|
||||||
return s.processVMSelectLabelEntries(ctx)
|
return s.processVMSelectLabelEntries(ctx)
|
||||||
case "labels":
|
case "labels_v2":
|
||||||
return s.processVMSelectLabels(ctx)
|
return s.processVMSelectLabels(ctx)
|
||||||
case "seriesCount":
|
case "seriesCount_v2":
|
||||||
return s.processVMSelectSeriesCount(ctx)
|
return s.processVMSelectSeriesCount(ctx)
|
||||||
case "tsdbStatus":
|
case "tsdbStatus_v2":
|
||||||
return s.processVMSelectTSDBStatus(ctx)
|
return s.processVMSelectTSDBStatus(ctx)
|
||||||
case "deleteMetrics_v2":
|
case "deleteMetrics_v3":
|
||||||
return s.processVMSelectDeleteMetrics(ctx)
|
return s.processVMSelectDeleteMetrics(ctx)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unsupported rpcName: %q", ctx.dataBuf)
|
return fmt.Errorf("unsupported rpcName: %q", ctx.dataBuf)
|
||||||
|
@ -584,7 +595,7 @@ func (s *Server) processVMSelectLabels(ctx *vmselectRequestCtx) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search for tag keys
|
// Search for tag keys
|
||||||
labels, err := s.storage.SearchTagKeys(accountID, projectID, *maxTagKeysPerSearch)
|
labels, err := s.storage.SearchTagKeys(accountID, projectID, *maxTagKeysPerSearch, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
|
@ -632,7 +643,7 @@ func (s *Server) processVMSelectLabelValues(ctx *vmselectRequestCtx) error {
|
||||||
labelName := ctx.dataBuf
|
labelName := ctx.dataBuf
|
||||||
|
|
||||||
// Search for tag values
|
// Search for tag values
|
||||||
labelValues, err := s.storage.SearchTagValues(accountID, projectID, labelName, *maxTagValuesPerSearch)
|
labelValues, err := s.storage.SearchTagValues(accountID, projectID, labelName, *maxTagValuesPerSearch, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
|
@ -676,7 +687,7 @@ func (s *Server) processVMSelectLabelEntries(ctx *vmselectRequestCtx) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform the request
|
// Perform the request
|
||||||
labelEntries, err := s.storage.SearchTagEntries(accountID, projectID, *maxTagKeysPerSearch, *maxTagValuesPerSearch)
|
labelEntries, err := s.storage.SearchTagEntries(accountID, projectID, *maxTagKeysPerSearch, *maxTagValuesPerSearch, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
|
@ -723,7 +734,7 @@ func (s *Server) processVMSelectSeriesCount(ctx *vmselectRequestCtx) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the request
|
// Execute the request
|
||||||
n, err := s.storage.GetSeriesCount(accountID, projectID)
|
n, err := s.storage.GetSeriesCount(accountID, projectID, ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
|
@ -762,7 +773,7 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the request
|
// Execute the request
|
||||||
status, err := s.storage.GetTSDBStatusForDate(accountID, projectID, uint64(date), int(topN))
|
status, err := s.storage.GetTSDBStatusForDate(accountID, projectID, uint64(date), int(topN), ctx.deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
|
@ -833,7 +844,7 @@ func (s *Server) processVMSelectSearchQuery(ctx *vmselectRequestCtx) error {
|
||||||
if err := checkTimeRange(s.storage, tr); err != nil {
|
if err := checkTimeRange(s.storage, tr); err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
}
|
}
|
||||||
ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch)
|
ctx.sr.Init(s.storage, ctx.tfss, tr, *maxMetricsPerSearch, ctx.deadline)
|
||||||
defer ctx.sr.MustClose()
|
defer ctx.sr.MustClose()
|
||||||
if err := ctx.sr.Error(); err != nil {
|
if err := ctx.sr.Error(); err != nil {
|
||||||
return ctx.writeErrorMessage(err)
|
return ctx.writeErrorMessage(err)
|
||||||
|
|
|
@ -20,7 +20,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||||
"github.com/VictoriaMetrics/fastcache"
|
"github.com/VictoriaMetrics/fastcache"
|
||||||
|
@ -202,7 +201,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
|
||||||
prevHourMetricIDs: prevHourMetricIDs,
|
prevHourMetricIDs: prevHourMetricIDs,
|
||||||
}
|
}
|
||||||
|
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
dmis, err := is.loadDeletedMetricIDs()
|
dmis, err := is.loadDeletedMetricIDs()
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -210,7 +209,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
|
||||||
}
|
}
|
||||||
db.setDeletedMetricIDs(dmis)
|
db.setDeletedMetricIDs(dmis)
|
||||||
|
|
||||||
is = db.getIndexSearch()
|
is = db.getIndexSearch(noDeadline)
|
||||||
date, err := is.getStartDateForPerDayInvertedIndex()
|
date, err := is.getStartDateForPerDayInvertedIndex()
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -221,6 +220,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const noDeadline = 1<<64 - 1
|
||||||
|
|
||||||
// IndexDBMetrics contains essential metrics for indexDB.
|
// IndexDBMetrics contains essential metrics for indexDB.
|
||||||
type IndexDBMetrics struct {
|
type IndexDBMetrics struct {
|
||||||
TagCacheSize uint64
|
TagCacheSize uint64
|
||||||
|
@ -525,7 +526,7 @@ var tagFiltersKeyGen uint64
|
||||||
//
|
//
|
||||||
// It returns io.EOF if the given mn isn't found locally.
|
// It returns io.EOF if the given mn isn't found locally.
|
||||||
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
|
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
err := is.getTSIDByMetricName(dst, metricName)
|
err := is.getTSIDByMetricName(dst, metricName)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -548,6 +549,9 @@ type indexSearch struct {
|
||||||
kb bytesutil.ByteBuffer
|
kb bytesutil.ByteBuffer
|
||||||
mp tagToMetricIDsRowParser
|
mp tagToMetricIDsRowParser
|
||||||
|
|
||||||
|
// deadline in unix timestamp seconds for the given search.
|
||||||
|
deadline uint64
|
||||||
|
|
||||||
// tsidByNameMisses and tsidByNameSkips is used for a performance
|
// tsidByNameMisses and tsidByNameSkips is used for a performance
|
||||||
// hack in GetOrCreateTSIDByName. See the comment there.
|
// hack in GetOrCreateTSIDByName. See the comment there.
|
||||||
tsidByNameMisses int
|
tsidByNameMisses int
|
||||||
|
@ -586,7 +590,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *indexDB) getIndexSearch() *indexSearch {
|
func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
|
||||||
v := db.indexSearchPool.Get()
|
v := db.indexSearchPool.Get()
|
||||||
if v == nil {
|
if v == nil {
|
||||||
v = &indexSearch{
|
v = &indexSearch{
|
||||||
|
@ -595,6 +599,7 @@ func (db *indexDB) getIndexSearch() *indexSearch {
|
||||||
}
|
}
|
||||||
is := v.(*indexSearch)
|
is := v.(*indexSearch)
|
||||||
is.ts.Init(db.tb, shouldCacheBlock)
|
is.ts.Init(db.tb, shouldCacheBlock)
|
||||||
|
is.deadline = deadline
|
||||||
return is
|
return is
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -602,6 +607,7 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
|
||||||
is.ts.MustClose()
|
is.ts.MustClose()
|
||||||
is.kb.Reset()
|
is.kb.Reset()
|
||||||
is.mp.Reset()
|
is.mp.Reset()
|
||||||
|
is.deadline = 0
|
||||||
|
|
||||||
// Do not reset tsidByNameMisses and tsidByNameSkips,
|
// Do not reset tsidByNameMisses and tsidByNameSkips,
|
||||||
// since they are used in GetOrCreateTSIDByName across call boundaries.
|
// since they are used in GetOrCreateTSIDByName across call boundaries.
|
||||||
|
@ -747,12 +753,12 @@ func putIndexItems(ii *indexItems) {
|
||||||
var indexItemsPool sync.Pool
|
var indexItemsPool sync.Pool
|
||||||
|
|
||||||
// SearchTagKeys returns all the tag keys for the given accountID, projectID.
|
// SearchTagKeys returns all the tag keys for the given accountID, projectID.
|
||||||
func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([]string, error) {
|
func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) {
|
||||||
// TODO: cache results?
|
// TODO: cache results?
|
||||||
|
|
||||||
tks := make(map[string]struct{})
|
tks := make(map[string]struct{})
|
||||||
|
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(deadline)
|
||||||
err := is.searchTagKeys(accountID, projectID, tks, maxTagKeys)
|
err := is.searchTagKeys(accountID, projectID, tks, maxTagKeys)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -760,7 +766,7 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([
|
||||||
}
|
}
|
||||||
|
|
||||||
ok := db.doExtDB(func(extDB *indexDB) {
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
is := extDB.getIndexSearch()
|
is := extDB.getIndexSearch(deadline)
|
||||||
err = is.searchTagKeys(accountID, projectID, tks, maxTagKeys)
|
err = is.searchTagKeys(accountID, projectID, tks, maxTagKeys)
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
})
|
})
|
||||||
|
@ -790,7 +796,9 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for len(tks) < maxTagKeys && ts.NextItem() {
|
for len(tks) < maxTagKeys && ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<16) == 0 {
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
@ -822,18 +830,18 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchTagValues returns all the tag values for the given tagKey
|
// SearchTagValues returns all the tag values for the given tagKey
|
||||||
func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int) ([]string, error) {
|
func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
|
||||||
// TODO: cache results?
|
// TODO: cache results?
|
||||||
|
|
||||||
tvs := make(map[string]struct{})
|
tvs := make(map[string]struct{})
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(deadline)
|
||||||
err := is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
|
err := is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ok := db.doExtDB(func(extDB *indexDB) {
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
is := extDB.getIndexSearch()
|
is := extDB.getIndexSearch(deadline)
|
||||||
err = is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
|
err = is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
})
|
})
|
||||||
|
@ -868,7 +876,9 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for len(tvs) < maxTagValues && ts.NextItem() {
|
for len(tvs) < maxTagValues && ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<16) == 0 {
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
@ -910,8 +920,8 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri
|
||||||
//
|
//
|
||||||
// It includes the deleted series too and may count the same series
|
// It includes the deleted series too and may count the same series
|
||||||
// up to two times - in db and extDB.
|
// up to two times - in db and extDB.
|
||||||
func (db *indexDB) GetSeriesCount(accountID, projectID uint32) (uint64, error) {
|
func (db *indexDB) GetSeriesCount(accountID, projectID uint32, deadline uint64) (uint64, error) {
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(deadline)
|
||||||
n, err := is.getSeriesCount(accountID, projectID)
|
n, err := is.getSeriesCount(accountID, projectID)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -920,7 +930,7 @@ func (db *indexDB) GetSeriesCount(accountID, projectID uint32) (uint64, error) {
|
||||||
|
|
||||||
var nExt uint64
|
var nExt uint64
|
||||||
ok := db.doExtDB(func(extDB *indexDB) {
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
is := extDB.getIndexSearch()
|
is := extDB.getIndexSearch(deadline)
|
||||||
nExt, err = is.getSeriesCount(accountID, projectID)
|
nExt, err = is.getSeriesCount(accountID, projectID)
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
})
|
})
|
||||||
|
@ -942,7 +952,9 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro
|
||||||
ts.Seek(kb.B)
|
ts.Seek(kb.B)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<16) == 0 {
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
@ -970,8 +982,8 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID.
|
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID.
|
||||||
func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) {
|
func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(deadline)
|
||||||
status, err := is.getTSDBStatusForDate(accountID, projectID, date, topN)
|
status, err := is.getTSDBStatusForDate(accountID, projectID, date, topN)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -984,7 +996,7 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64
|
||||||
|
|
||||||
// The entries weren't found in the db. Try searching them in extDB.
|
// The entries weren't found in the db. Try searching them in extDB.
|
||||||
ok := db.doExtDB(func(extDB *indexDB) {
|
ok := db.doExtDB(func(extDB *indexDB) {
|
||||||
is := extDB.getIndexSearch()
|
is := extDB.getIndexSearch(deadline)
|
||||||
status, err = is.getTSDBStatusForDate(accountID, projectID, date, topN)
|
status, err = is.getTSDBStatusForDate(accountID, projectID, date, topN)
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
})
|
})
|
||||||
|
@ -1012,7 +1024,9 @@ func (is *indexSearch) getTSDBStatusForDate(accountID, projectID uint32, date ui
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<16) == 0 {
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
@ -1164,7 +1178,7 @@ func (th *topHeap) Pop() interface{} {
|
||||||
// searchMetricName appends metric name for the given metricID to dst
|
// searchMetricName appends metric name for the given metricID to dst
|
||||||
// and returns the result.
|
// and returns the result.
|
||||||
func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, projectID uint32) ([]byte, error) {
|
func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, projectID uint32) ([]byte, error) {
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
dst, err := is.searchMetricName(dst, metricID, accountID, projectID)
|
dst, err := is.searchMetricName(dst, metricID, accountID, projectID)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
|
|
||||||
|
@ -1174,7 +1188,7 @@ func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, proj
|
||||||
|
|
||||||
// Try searching in the external indexDB.
|
// Try searching in the external indexDB.
|
||||||
if db.doExtDB(func(extDB *indexDB) {
|
if db.doExtDB(func(extDB *indexDB) {
|
||||||
is := extDB.getIndexSearch()
|
is := extDB.getIndexSearch(noDeadline)
|
||||||
dst, err = is.searchMetricName(dst, metricID, accountID, projectID)
|
dst, err = is.searchMetricName(dst, metricID, accountID, projectID)
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
}) {
|
}) {
|
||||||
|
@ -1209,7 +1223,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||||
MinTimestamp: 0,
|
MinTimestamp: 0,
|
||||||
MaxTimestamp: (1 << 63) - 1,
|
MaxTimestamp: (1 << 63) - 1,
|
||||||
}
|
}
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9)
|
metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1349,7 +1363,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// searchTSIDs returns sorted tsids matching the given tfss over the given tr.
|
// searchTSIDs returns sorted tsids matching the given tfss over the given tr.
|
||||||
func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
|
||||||
if len(tfss) == 0 {
|
if len(tfss) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
@ -1365,7 +1379,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path - search for tsids in the db and extDB.
|
// Slow path - search for tsids in the db and extDB.
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(deadline)
|
||||||
localTSIDs, err := is.searchTSIDs(tfss, tr, maxMetrics)
|
localTSIDs, err := is.searchTSIDs(tfss, tr, maxMetrics)
|
||||||
db.putIndexSearch(is)
|
db.putIndexSearch(is)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1384,7 +1398,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
|
||||||
extTSIDs = tsids
|
extTSIDs = tsids
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
is := extDB.getIndexSearch()
|
is := extDB.getIndexSearch(deadline)
|
||||||
extTSIDs, err = is.searchTSIDs(tfss, tr, maxMetrics)
|
extTSIDs, err = is.searchTSIDs(tfss, tr, maxMetrics)
|
||||||
extDB.putIndexSearch(is)
|
extDB.putIndexSearch(is)
|
||||||
|
|
||||||
|
@ -1546,7 +1560,9 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
|
||||||
i := 0
|
i := 0
|
||||||
for loopsPaceLimiter, metricID := range metricIDs {
|
for loopsPaceLimiter, metricID := range metricIDs {
|
||||||
if loopsPaceLimiter&(1<<10) == 0 {
|
if loopsPaceLimiter&(1<<10) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
|
||||||
// than scanning the mergeset if it contains a lot of metricIDs.
|
// than scanning the mergeset if it contains a lot of metricIDs.
|
||||||
|
@ -1616,7 +1632,9 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
|
||||||
defer PutMetricName(mn)
|
defer PutMetricName(mn)
|
||||||
for loopsPaceLimiter, metricID := range sortedMetricIDs {
|
for loopsPaceLimiter, metricID := range sortedMetricIDs {
|
||||||
if loopsPaceLimiter&(1<<10) == 0 {
|
if loopsPaceLimiter&(1<<10) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID, accountID, projectID)
|
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID, accountID, projectID)
|
||||||
|
@ -2102,7 +2120,9 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<14) == 0 {
|
if loopsPaceLimiter&(1<<14) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
@ -2224,7 +2244,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<16) == 0 {
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
@ -2264,7 +2286,9 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
|
||||||
var metricID uint64
|
var metricID uint64
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<12) == 0 {
|
if loopsPaceLimiter&(1<<12) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
@ -2374,7 +2398,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(date uint64) {
|
go func(date uint64) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
isLocal := is.db.getIndexSearch()
|
isLocal := is.db.getIndexSearch(is.deadline)
|
||||||
defer is.db.putIndexSearch(isLocal)
|
defer is.db.putIndexSearch(isLocal)
|
||||||
m, err := isLocal.getMetricIDsForDate(date, accountID, projectID, maxMetrics)
|
m, err := isLocal.getMetricIDsForDate(date, accountID, projectID, maxMetrics)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
@ -2431,7 +2455,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(date uint64) {
|
go func(date uint64) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
isLocal := is.db.getIndexSearch()
|
isLocal := is.db.getIndexSearch(is.deadline)
|
||||||
defer is.db.putIndexSearch(isLocal)
|
defer is.db.putIndexSearch(isLocal)
|
||||||
m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
|
m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
@ -2814,7 +2838,9 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
if loopsPaceLimiter&(1<<16) == 0 {
|
if loopsPaceLimiter&(1<<16) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
|
|
|
@ -629,7 +629,7 @@ func testIndexDBBigMetricName(db *indexDB) error {
|
||||||
var mn MetricName
|
var mn MetricName
|
||||||
var tsid TSID
|
var tsid TSID
|
||||||
|
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
defer db.putIndexSearch(is)
|
defer db.putIndexSearch(is)
|
||||||
|
|
||||||
// Try creating too big metric group
|
// Try creating too big metric group
|
||||||
|
@ -690,7 +690,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
|
||||||
var mns []MetricName
|
var mns []MetricName
|
||||||
var tsids []TSID
|
var tsids []TSID
|
||||||
|
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
defer db.putIndexSearch(is)
|
defer db.putIndexSearch(is)
|
||||||
|
|
||||||
var metricNameBuf []byte
|
var metricNameBuf []byte
|
||||||
|
@ -807,7 +807,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test SearchTagValues
|
// Test SearchTagValues
|
||||||
tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, nil, 1e5)
|
tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, nil, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchTagValues for __name__: %w", err)
|
return fmt.Errorf("error in SearchTagValues for __name__: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -821,7 +821,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
}
|
}
|
||||||
for i := range mn.Tags {
|
for i := range mn.Tags {
|
||||||
tag := &mn.Tags[i]
|
tag := &mn.Tags[i]
|
||||||
tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, tag.Key, 1e5)
|
tvs, err := db.SearchTagValues(mn.AccountID, mn.ProjectID, tag.Key, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchTagValues for __name__: %w", err)
|
return fmt.Errorf("error in SearchTagValues for __name__: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -834,7 +834,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
|
|
||||||
// Test SearchTagKeys
|
// Test SearchTagKeys
|
||||||
for k, apKeys := range allKeys {
|
for k, apKeys := range allKeys {
|
||||||
tks, err := db.SearchTagKeys(k.AccountID, k.ProjectID, 1e5)
|
tks, err := db.SearchTagKeys(k.AccountID, k.ProjectID, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchTagKeys: %w", err)
|
return fmt.Errorf("error in SearchTagKeys: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -853,7 +853,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
// would return more timeseries than needed.
|
// would return more timeseries than needed.
|
||||||
if !isConcurrent {
|
if !isConcurrent {
|
||||||
for k, tc := range timeseriesCounters {
|
for k, tc := range timeseriesCounters {
|
||||||
n, err := db.GetSeriesCount(k.AccountID, k.ProjectID)
|
n, err := db.GetSeriesCount(k.AccountID, k.ProjectID, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unexpected error in GetSeriesCount(%v): %w", k, err)
|
return fmt.Errorf("unexpected error in GetSeriesCount(%v): %w", k, err)
|
||||||
}
|
}
|
||||||
|
@ -885,7 +885,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if err := tfs.Add(nil, nil, true, false); err != nil {
|
if err := tfs.Add(nil, nil, true, false); err != nil {
|
||||||
return fmt.Errorf("cannot add no-op negative filter: %w", err)
|
return fmt.Errorf("cannot add no-op negative filter: %w", err)
|
||||||
}
|
}
|
||||||
tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search by exact tag filter: %w", err)
|
return fmt.Errorf("cannot search by exact tag filter: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -894,7 +894,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify tag cache.
|
// Verify tag cache.
|
||||||
tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search by exact tag filter: %w", err)
|
return fmt.Errorf("cannot search by exact tag filter: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -906,7 +906,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
|
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
|
||||||
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
|
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
|
||||||
}
|
}
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
|
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -927,7 +927,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if tfsNew := tfs.Finalize(); len(tfsNew) > 0 {
|
if tfsNew := tfs.Finalize(); len(tfsNew) > 0 {
|
||||||
return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew)
|
return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew)
|
||||||
}
|
}
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
|
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -955,7 +955,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if err := tfs.Add(nil, nil, true, true); err != nil {
|
if err := tfs.Add(nil, nil, true, true); err != nil {
|
||||||
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
|
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
|
||||||
}
|
}
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search by regexp tag filter: %w", err)
|
return fmt.Errorf("cannot search by regexp tag filter: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -965,7 +965,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
|
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
|
||||||
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
|
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
|
||||||
}
|
}
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
|
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -981,7 +981,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
|
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
|
||||||
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
|
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
|
||||||
}
|
}
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
|
return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -997,7 +997,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
|
|
||||||
// Search with empty filter. It should match all the results for (accountID, projectID).
|
// Search with empty filter. It should match all the results for (accountID, projectID).
|
||||||
tfs.Reset(mn.AccountID, mn.ProjectID)
|
tfs.Reset(mn.AccountID, mn.ProjectID)
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search for common prefix: %w", err)
|
return fmt.Errorf("cannot search for common prefix: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -1010,7 +1010,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if err := tfs.Add(nil, nil, false, false); err != nil {
|
if err := tfs.Add(nil, nil, false, false); err != nil {
|
||||||
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
|
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
|
||||||
}
|
}
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
|
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -1027,7 +1027,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
|
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
|
||||||
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
|
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
|
||||||
}
|
}
|
||||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
|
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -1036,7 +1036,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify empty tfss
|
// Verify empty tfss
|
||||||
tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5)
|
tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot search for nil tfss: %w", err)
|
return fmt.Errorf("cannot search for nil tfss: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -1570,7 +1570,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
defer db.putIndexSearch(is)
|
defer db.putIndexSearch(is)
|
||||||
|
|
||||||
// Create a bunch of per-day time series
|
// Create a bunch of per-day time series
|
||||||
|
@ -1658,7 +1658,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
MinTimestamp: int64(now - msecPerHour + 1),
|
MinTimestamp: int64(now - msecPerHour + 1),
|
||||||
MaxTimestamp: int64(now),
|
MaxTimestamp: int64(now),
|
||||||
}
|
}
|
||||||
matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000)
|
matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error searching tsids: %v", err)
|
t.Fatalf("error searching tsids: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1672,7 +1672,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
MinTimestamp: int64(now - 2*msecPerHour - 1),
|
MinTimestamp: int64(now - 2*msecPerHour - 1),
|
||||||
MaxTimestamp: int64(now),
|
MaxTimestamp: int64(now),
|
||||||
}
|
}
|
||||||
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000)
|
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error searching tsids: %v", err)
|
t.Fatalf("error searching tsids: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1686,7 +1686,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
MaxTimestamp: int64(now),
|
MaxTimestamp: int64(now),
|
||||||
}
|
}
|
||||||
|
|
||||||
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000)
|
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error searching tsids: %v", err)
|
t.Fatalf("error searching tsids: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1695,7 +1695,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check GetTSDBStatusForDate
|
// Check GetTSDBStatusForDate
|
||||||
status, err := db.GetTSDBStatusForDate(accountID, projectID, baseDate, 5)
|
status, err := db.GetTSDBStatusForDate(accountID, projectID, baseDate, 5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error in GetTSDBStatusForDate: %s", err)
|
t.Fatalf("error in GetTSDBStatusForDate: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
||||||
|
|
||||||
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
|
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
|
||||||
var metricName []byte
|
var metricName []byte
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
defer db.putIndexSearch(is)
|
defer db.putIndexSearch(is)
|
||||||
for i := 0; i < recordsPerLoop; i++ {
|
for i := 0; i < recordsPerLoop; i++ {
|
||||||
mn.MetricGroup = strconv.AppendUint(mn.MetricGroup[:0], uint64(i+startOffset), 10)
|
mn.MetricGroup = strconv.AppendUint(mn.MetricGroup[:0], uint64(i+startOffset), 10)
|
||||||
|
@ -177,7 +177,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
|
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
defer db.putIndexSearch(is)
|
defer db.putIndexSearch(is)
|
||||||
tfss := []*TagFilters{tfs}
|
tfss := []*TagFilters{tfs}
|
||||||
tr := TimeRange{
|
tr := TimeRange{
|
||||||
|
@ -344,7 +344,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||||
var tsid TSID
|
var tsid TSID
|
||||||
var metricName []byte
|
var metricName []byte
|
||||||
|
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
defer db.putIndexSearch(is)
|
defer db.putIndexSearch(is)
|
||||||
for i := 0; i < recordsCount; i++ {
|
for i := 0; i < recordsCount; i++ {
|
||||||
mn.AccountID = uint32(i % accountsCount)
|
mn.AccountID = uint32(i % accountsCount)
|
||||||
|
@ -363,7 +363,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||||
var tsidLocal TSID
|
var tsidLocal TSID
|
||||||
var metricNameLocal []byte
|
var metricNameLocal []byte
|
||||||
mnLocal := mn
|
mnLocal := mn
|
||||||
is := db.getIndexSearch()
|
is := db.getIndexSearch(noDeadline)
|
||||||
defer db.putIndexSearch(is)
|
defer db.putIndexSearch(is)
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
for i := 0; i < recordsPerLoop; i++ {
|
for i := 0; i < recordsPerLoop; i++ {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
||||||
)
|
)
|
||||||
|
@ -129,6 +130,9 @@ type Search struct {
|
||||||
|
|
||||||
ts tableSearch
|
ts tableSearch
|
||||||
|
|
||||||
|
// deadline in unix timestamp seconds for the current search.
|
||||||
|
deadline uint64
|
||||||
|
|
||||||
err error
|
err error
|
||||||
|
|
||||||
needClosing bool
|
needClosing bool
|
||||||
|
@ -142,6 +146,7 @@ func (s *Search) reset() {
|
||||||
|
|
||||||
s.storage = nil
|
s.storage = nil
|
||||||
s.ts.reset()
|
s.ts.reset()
|
||||||
|
s.deadline = 0
|
||||||
s.err = nil
|
s.err = nil
|
||||||
s.needClosing = false
|
s.needClosing = false
|
||||||
s.loops = 0
|
s.loops = 0
|
||||||
|
@ -150,17 +155,18 @@ func (s *Search) reset() {
|
||||||
// Init initializes s from the given storage, tfss and tr.
|
// Init initializes s from the given storage, tfss and tr.
|
||||||
//
|
//
|
||||||
// MustClose must be called when the search is done.
|
// MustClose must be called when the search is done.
|
||||||
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int) {
|
func (s *Search) Init(storage *Storage, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) {
|
||||||
if s.needClosing {
|
if s.needClosing {
|
||||||
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
logger.Panicf("BUG: missing MustClose call before the next call to Init")
|
||||||
}
|
}
|
||||||
|
|
||||||
s.reset()
|
s.reset()
|
||||||
|
s.deadline = deadline
|
||||||
s.needClosing = true
|
s.needClosing = true
|
||||||
|
|
||||||
tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics)
|
tsids, err := storage.searchTSIDs(tfss, tr, maxMetrics, deadline)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = storage.prefetchMetricNames(tsids)
|
err = storage.prefetchMetricNames(tsids, deadline)
|
||||||
}
|
}
|
||||||
// It is ok to call Init on error from storage.searchTSIDs.
|
// It is ok to call Init on error from storage.searchTSIDs.
|
||||||
// Init must be called before returning because it will fail
|
// Init must be called before returning because it will fail
|
||||||
|
@ -199,7 +205,10 @@ func (s *Search) NextMetricBlock() bool {
|
||||||
}
|
}
|
||||||
for s.ts.NextBlock() {
|
for s.ts.NextBlock() {
|
||||||
if s.loops&(1<<10) == 0 {
|
if s.loops&(1<<10) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(s.deadline); err != nil {
|
||||||
|
s.err = err
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
s.loops++
|
s.loops++
|
||||||
tsid := &s.ts.BlockRef.bh.TSID
|
tsid := &s.ts.BlockRef.bh.TSID
|
||||||
|
@ -401,3 +410,11 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
|
||||||
|
|
||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkSearchDeadlineAndPace(deadline uint64) error {
|
||||||
|
if fasttime.UnixTimestamp() > deadline {
|
||||||
|
return errDeadlineExceeded
|
||||||
|
}
|
||||||
|
storagepacelimiter.Search.WaitIfNeeded()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -225,7 +225,7 @@ func testSearchInternal(st *Storage, tr TimeRange, mrs []MetricRow, accountsCoun
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search
|
// Search
|
||||||
s.Init(st, []*TagFilters{tfs}, tr, 1e5)
|
s.Init(st, []*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||||
var mbs []metricBlock
|
var mbs []metricBlock
|
||||||
for s.NextMetricBlock() {
|
for s.NextMetricBlock() {
|
||||||
var b Block
|
var b Block
|
||||||
|
|
|
@ -862,10 +862,10 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
|
||||||
}
|
}
|
||||||
|
|
||||||
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
||||||
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
|
||||||
// Do not cache tfss -> tsids here, since the caching is performed
|
// Do not cache tfss -> tsids here, since the caching is performed
|
||||||
// on idb level.
|
// on idb level.
|
||||||
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics)
|
tsids, err := s.idb().searchTSIDs(tfss, tr, maxMetrics, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err)
|
return nil, fmt.Errorf("error when searching tsids for tfss %q: %w", tfss, err)
|
||||||
}
|
}
|
||||||
|
@ -877,7 +877,7 @@ func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int)
|
||||||
// It is expected that all the tsdis have the same (accountID, projectID)
|
// It is expected that all the tsdis have the same (accountID, projectID)
|
||||||
//
|
//
|
||||||
// This should speed-up further searchMetricName calls for metricIDs from tsids.
|
// This should speed-up further searchMetricName calls for metricIDs from tsids.
|
||||||
func (s *Storage) prefetchMetricNames(tsids []TSID) error {
|
func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
|
||||||
if len(tsids) == 0 {
|
if len(tsids) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -907,11 +907,13 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error {
|
||||||
var metricName []byte
|
var metricName []byte
|
||||||
var err error
|
var err error
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
is := idb.getIndexSearch()
|
is := idb.getIndexSearch(deadline)
|
||||||
defer idb.putIndexSearch(is)
|
defer idb.putIndexSearch(is)
|
||||||
for loops, metricID := range metricIDs {
|
for loops, metricID := range metricIDs {
|
||||||
if loops&(1<<10) == 0 {
|
if loops&(1<<10) == 0 {
|
||||||
storagepacelimiter.Search.WaitIfNeeded()
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
metricName, err = is.searchMetricName(metricName[:0], metricID, accountID, projectID)
|
metricName, err = is.searchMetricName(metricName[:0], metricID, accountID, projectID)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
|
@ -926,6 +928,8 @@ func (s *Storage) prefetchMetricNames(tsids []TSID) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errDeadlineExceeded = fmt.Errorf("deadline exceeded")
|
||||||
|
|
||||||
// DeleteMetrics deletes all the metrics matching the given tfss.
|
// DeleteMetrics deletes all the metrics matching the given tfss.
|
||||||
//
|
//
|
||||||
// Returns the number of metrics deleted.
|
// Returns the number of metrics deleted.
|
||||||
|
@ -950,19 +954,19 @@ func (s *Storage) searchMetricName(dst []byte, metricID uint64, accountID, proje
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchTagKeys searches for tag keys for the given (accountID, projectID).
|
// SearchTagKeys searches for tag keys for the given (accountID, projectID).
|
||||||
func (s *Storage) SearchTagKeys(accountID, projectID uint32, maxTagKeys int) ([]string, error) {
|
func (s *Storage) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, deadline uint64) ([]string, error) {
|
||||||
return s.idb().SearchTagKeys(accountID, projectID, maxTagKeys)
|
return s.idb().SearchTagKeys(accountID, projectID, maxTagKeys, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchTagValues searches for tag values for the given tagKey in (accountID, projectID).
|
// SearchTagValues searches for tag values for the given tagKey in (accountID, projectID).
|
||||||
func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int) ([]string, error) {
|
func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, maxTagValues int, deadline uint64) ([]string, error) {
|
||||||
return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues)
|
return s.idb().SearchTagValues(accountID, projectID, tagKey, maxTagValues, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchTagEntries returns a list of (tagName -> tagValues) for (accountID, projectID).
|
// SearchTagEntries returns a list of (tagName -> tagValues) for (accountID, projectID).
|
||||||
func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxTagValues int) ([]TagEntry, error) {
|
func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) {
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
keys, err := idb.SearchTagKeys(accountID, projectID, maxTagKeys)
|
keys, err := idb.SearchTagKeys(accountID, projectID, maxTagKeys, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot search tag keys: %w", err)
|
return nil, fmt.Errorf("cannot search tag keys: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -972,7 +976,7 @@ func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxT
|
||||||
|
|
||||||
tes := make([]TagEntry, len(keys))
|
tes := make([]TagEntry, len(keys))
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
values, err := idb.SearchTagValues(accountID, projectID, []byte(key), maxTagValues)
|
values, err := idb.SearchTagValues(accountID, projectID, []byte(key), maxTagValues, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot search values for tag %q: %w", key, err)
|
return nil, fmt.Errorf("cannot search values for tag %q: %w", key, err)
|
||||||
}
|
}
|
||||||
|
@ -996,15 +1000,15 @@ type TagEntry struct {
|
||||||
//
|
//
|
||||||
// It includes the deleted series too and may count the same series
|
// It includes the deleted series too and may count the same series
|
||||||
// up to two times - in db and extDB.
|
// up to two times - in db and extDB.
|
||||||
func (s *Storage) GetSeriesCount(accountID, projectID uint32) (uint64, error) {
|
func (s *Storage) GetSeriesCount(accountID, projectID uint32, deadline uint64) (uint64, error) {
|
||||||
return s.idb().GetSeriesCount(accountID, projectID)
|
return s.idb().GetSeriesCount(accountID, projectID, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTSDBStatusForDate returns TSDB status data for /api/v1/status/tsdb for the given (accountID, projectID).
|
// GetTSDBStatusForDate returns TSDB status data for /api/v1/status/tsdb for the given (accountID, projectID).
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||||
func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) {
|
func (s *Storage) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
|
||||||
return s.idb().GetTSDBStatusForDate(accountID, projectID, date, topN)
|
return s.idb().GetTSDBStatusForDate(accountID, projectID, date, topN, deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetricRow is a metric to insert into storage.
|
// MetricRow is a metric to insert into storage.
|
||||||
|
@ -1206,7 +1210,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||||
sort.Slice(pendingMetricRows, func(i, j int) bool {
|
sort.Slice(pendingMetricRows, func(i, j int) bool {
|
||||||
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
|
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
|
||||||
})
|
})
|
||||||
is := idb.getIndexSearch()
|
is := idb.getIndexSearch(noDeadline)
|
||||||
prevMetricNameRaw = nil
|
prevMetricNameRaw = nil
|
||||||
for i := range pendingMetricRows {
|
for i := range pendingMetricRows {
|
||||||
pmr := &pendingMetricRows[i]
|
pmr := &pendingMetricRows[i]
|
||||||
|
@ -1431,7 +1435,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
|
||||||
return a.metricID < b.metricID
|
return a.metricID < b.metricID
|
||||||
})
|
})
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
is := idb.getIndexSearch()
|
is := idb.getIndexSearch(noDeadline)
|
||||||
defer idb.putIndexSearch(is)
|
defer idb.putIndexSearch(is)
|
||||||
var firstError error
|
var firstError error
|
||||||
prevMetricID = 0
|
prevMetricID = 0
|
||||||
|
|
|
@ -489,7 +489,7 @@ func TestStorageDeleteMetrics(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify no tag keys exist
|
// Verify no tag keys exist
|
||||||
tks, err := s.SearchTagKeys(0, 0, 1e5)
|
tks, err := s.SearchTagKeys(0, 0, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error in SearchTagKeys at the start: %s", err)
|
t.Fatalf("error in SearchTagKeys at the start: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -540,7 +540,7 @@ func TestStorageDeleteMetrics(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Verify no more tag keys exist
|
// Verify no more tag keys exist
|
||||||
tks, err = s.SearchTagKeys(0, 0, 1e5)
|
tks, err = s.SearchTagKeys(0, 0, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error in SearchTagKeys after the test: %s", err)
|
t.Fatalf("error in SearchTagKeys after the test: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -600,7 +600,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
||||||
s.debugFlush()
|
s.debugFlush()
|
||||||
|
|
||||||
// Verify tag values exist
|
// Verify tag values exist
|
||||||
tvs, err := s.SearchTagValues(accountID, projectID, workerTag, 1e5)
|
tvs, err := s.SearchTagValues(accountID, projectID, workerTag, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchTagValues before metrics removal: %w", err)
|
return fmt.Errorf("error in SearchTagValues before metrics removal: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -609,7 +609,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify tag keys exist
|
// Verify tag keys exist
|
||||||
tks, err := s.SearchTagKeys(accountID, projectID, 1e5)
|
tks, err := s.SearchTagKeys(accountID, projectID, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchTagKeys before metrics removal: %w", err)
|
return fmt.Errorf("error in SearchTagKeys before metrics removal: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -625,7 +625,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
||||||
metricBlocksCount := func(tfs *TagFilters) int {
|
metricBlocksCount := func(tfs *TagFilters) int {
|
||||||
// Verify the number of blocks
|
// Verify the number of blocks
|
||||||
n := 0
|
n := 0
|
||||||
sr.Init(s, []*TagFilters{tfs}, tr, 1e5)
|
sr.Init(s, []*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||||
for sr.NextMetricBlock() {
|
for sr.NextMetricBlock() {
|
||||||
n++
|
n++
|
||||||
}
|
}
|
||||||
|
@ -673,7 +673,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error {
|
||||||
if n := metricBlocksCount(tfs); n != 0 {
|
if n := metricBlocksCount(tfs); n != 0 {
|
||||||
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
|
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
|
||||||
}
|
}
|
||||||
tvs, err = s.SearchTagValues(accountID, projectID, workerTag, 1e5)
|
tvs, err = s.SearchTagValues(accountID, projectID, workerTag, 1e5, noDeadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in SearchTagValues after all the metrics are removed: %w", err)
|
return fmt.Errorf("error in SearchTagValues after all the metrics are removed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue