lib/storage: slightly reduce code difference between single-node and cluster versions

This commit is contained in:
Aliaksandr Valialkin 2020-07-24 00:30:33 +03:00
parent 8d1721d128
commit 94cc677b0c
4 changed files with 121 additions and 109 deletions

View file

@ -201,7 +201,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
prevHourMetricIDs: prevHourMetricIDs,
}
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(0, 0, noDeadline)
dmis, err := is.loadDeletedMetricIDs()
db.putIndexSearch(is)
if err != nil {
@ -209,7 +209,7 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
}
db.setDeletedMetricIDs(dmis)
is = db.getIndexSearch(noDeadline)
is = db.getIndexSearch(0, 0, noDeadline)
date, err := is.getStartDateForPerDayInvertedIndex()
db.putIndexSearch(is)
if err != nil {
@ -526,7 +526,7 @@ var tagFiltersKeyGen uint64
//
// It returns io.EOF if the given mn isn't found locally.
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(0, 0, noDeadline)
err := is.getTSIDByMetricName(dst, metricName)
db.putIndexSearch(is)
if err == nil {
@ -549,6 +549,9 @@ type indexSearch struct {
kb bytesutil.ByteBuffer
mp tagToMetricIDsRowParser
accountID uint32
projectID uint32
// deadline in unix timestamp seconds for the given search.
deadline uint64
@ -590,7 +593,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error
return nil
}
func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
func (db *indexDB) getIndexSearch(accountID, projectID uint32, deadline uint64) *indexSearch {
v := db.indexSearchPool.Get()
if v == nil {
v = &indexSearch{
@ -599,6 +602,8 @@ func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
}
is := v.(*indexSearch)
is.ts.Init(db.tb, shouldCacheBlock)
is.accountID = accountID
is.projectID = projectID
is.deadline = deadline
return is
}
@ -607,6 +612,8 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
is.ts.MustClose()
is.kb.Reset()
is.mp.Reset()
is.accountID = 0
is.projectID = 0
is.deadline = 0
// Do not reset tsidByNameMisses and tsidByNameSkips,
@ -758,16 +765,16 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, de
tks := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagKeys(accountID, projectID, tks, maxTagKeys)
is := db.getIndexSearch(accountID, projectID, deadline)
err := is.searchTagKeys(tks, maxTagKeys)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
err = is.searchTagKeys(accountID, projectID, tks, maxTagKeys)
is := extDB.getIndexSearch(accountID, projectID, deadline)
err = is.searchTagKeys(tks, maxTagKeys)
extDB.putIndexSearch(is)
})
if ok && err != nil {
@ -784,14 +791,14 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, de
return keys, nil
}
func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string]struct{}, maxTagKeys int) error {
func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
prefix := kb.B
ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() {
@ -818,7 +825,7 @@ func (is *indexSearch) searchTagKeys(accountID, projectID uint32, tks map[string
// Search for the next tag key.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag key.
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
kb.B[len(kb.B)-1]++
ts.Seek(kb.B)
@ -834,15 +841,15 @@ func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, m
// TODO: cache results?
tvs := make(map[string]struct{})
is := db.getIndexSearch(deadline)
err := is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
is := db.getIndexSearch(accountID, projectID, deadline)
err := is.searchTagValues(tvs, tagKey, maxTagValues)
db.putIndexSearch(is)
if err != nil {
return nil, err
}
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
err = is.searchTagValues(accountID, projectID, tvs, tagKey, maxTagValues)
is := extDB.getIndexSearch(accountID, projectID, deadline)
err = is.searchTagValues(tvs, tagKey, maxTagValues)
extDB.putIndexSearch(is)
})
if ok && err != nil {
@ -863,14 +870,14 @@ func (db *indexDB) SearchTagValues(accountID, projectID uint32, tagKey []byte, m
return tagValues, nil
}
func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[string]struct{}, tagKey []byte, maxTagValues int) error {
func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, maxTagValues int) error {
ts := &is.ts
kb := &is.kb
mp := &is.mp
mp.Reset()
dmis := is.db.getDeletedMetricIDs()
loopsPaceLimiter := 0
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, tagKey)
prefix := kb.B
ts.Seek(prefix)
@ -904,7 +911,7 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri
// Search for the next tag value.
// The last char in kb.B must be tagSeparatorChar.
// Just increment it in order to jump to the next tag value.
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
kb.B = marshalTagValue(kb.B, mp.Tag.Value)
kb.B[len(kb.B)-1]++
@ -921,8 +928,8 @@ func (is *indexSearch) searchTagValues(accountID, projectID uint32, tvs map[stri
// It includes the deleted series too and may count the same series
// up to two times - in db and extDB.
func (db *indexDB) GetSeriesCount(accountID, projectID uint32, deadline uint64) (uint64, error) {
is := db.getIndexSearch(deadline)
n, err := is.getSeriesCount(accountID, projectID)
is := db.getIndexSearch(accountID, projectID, deadline)
n, err := is.getSeriesCount()
db.putIndexSearch(is)
if err != nil {
return 0, err
@ -930,8 +937,8 @@ func (db *indexDB) GetSeriesCount(accountID, projectID uint32, deadline uint64)
var nExt uint64
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
nExt, err = is.getSeriesCount(accountID, projectID)
is := extDB.getIndexSearch(accountID, projectID, deadline)
nExt, err = is.getSeriesCount()
extDB.putIndexSearch(is)
})
if ok && err != nil {
@ -940,14 +947,14 @@ func (db *indexDB) GetSeriesCount(accountID, projectID uint32, deadline uint64)
return n + nExt, nil
}
func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, error) {
func (is *indexSearch) getSeriesCount() (uint64, error) {
ts := &is.ts
kb := &is.kb
mp := &is.mp
loopsPaceLimiter := 0
var metricIDsLen uint64
// Extract the number of series from ((__name__=value): metricIDs) rows
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B)
for ts.NextItem() {
@ -983,8 +990,8 @@ func (is *indexSearch) getSeriesCount(accountID, projectID uint32) (uint64, erro
// GetTSDBStatusForDate returns topN entries for tsdb status for the given date, accountID and projectID.
func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline uint64) (*TSDBStatus, error) {
is := db.getIndexSearch(deadline)
status, err := is.getTSDBStatusForDate(accountID, projectID, date, topN)
is := db.getIndexSearch(accountID, projectID, deadline)
status, err := is.getTSDBStatusForDate(date, topN)
db.putIndexSearch(is)
if err != nil {
return nil, err
@ -996,8 +1003,8 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64
// The entries weren't found in the db. Try searching them in extDB.
ok := db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
status, err = is.getTSDBStatusForDate(accountID, projectID, date, topN)
is := extDB.getIndexSearch(accountID, projectID, deadline)
status, err = is.getTSDBStatusForDate(date, topN)
extDB.putIndexSearch(is)
})
if ok && err != nil {
@ -1006,7 +1013,7 @@ func (db *indexDB) GetTSDBStatusForDate(accountID, projectID uint32, date uint64
return status, nil
}
func (is *indexSearch) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int) (*TSDBStatus, error) {
func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus, error) {
ts := &is.ts
kb := &is.kb
mp := &is.mp
@ -1018,7 +1025,7 @@ func (is *indexSearch) getTSDBStatusForDate(accountID, projectID uint32, date ui
nameEqualBytes := []byte("__name__=")
loopsPaceLimiter := 0
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
prefix := kb.B
ts.Seek(prefix)
@ -1178,8 +1185,8 @@ func (th *topHeap) Pop() interface{} {
// searchMetricName appends metric name for the given metricID to dst
// and returns the result.
func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, projectID uint32) ([]byte, error) {
is := db.getIndexSearch(noDeadline)
dst, err := is.searchMetricName(dst, metricID, accountID, projectID)
is := db.getIndexSearch(accountID, projectID, noDeadline)
dst, err := is.searchMetricName(dst, metricID)
db.putIndexSearch(is)
if err != io.EOF {
@ -1188,8 +1195,8 @@ func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, proj
// Try searching in the external indexDB.
if db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(noDeadline)
dst, err = is.searchMetricName(dst, metricID, accountID, projectID)
is := extDB.getIndexSearch(accountID, projectID, noDeadline)
dst, err = is.searchMetricName(dst, metricID)
extDB.putIndexSearch(is)
}) {
return dst, err
@ -1223,7 +1230,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
MinTimestamp: 0,
MaxTimestamp: (1 << 63) - 1,
}
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(tfss[0].accountID, tfss[0].projectID, noDeadline)
metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9)
db.putIndexSearch(is)
if err != nil {
@ -1379,7 +1386,9 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
}
// Slow path - search for tsids in the db and extDB.
is := db.getIndexSearch(deadline)
accountID := tfss[0].accountID
projectID := tfss[0].projectID
is := db.getIndexSearch(accountID, projectID, deadline)
localTSIDs, err := is.searchTSIDs(tfss, tr, maxMetrics)
db.putIndexSearch(is)
if err != nil {
@ -1398,7 +1407,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
extTSIDs = tsids
return
}
is := extDB.getIndexSearch(deadline)
is := extDB.getIndexSearch(accountID, projectID, deadline)
extTSIDs, err = is.searchTSIDs(tfss, tr, maxMetrics)
extDB.putIndexSearch(is)
@ -1463,7 +1472,7 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
return io.EOF
}
func (is *indexSearch) searchMetricName(dst []byte, metricID uint64, accountID, projectID uint32) ([]byte, error) {
func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, error) {
metricName := is.db.getMetricNameFromCache(dst, metricID)
if len(metricName) > len(dst) {
return metricName, nil
@ -1471,7 +1480,7 @@ func (is *indexSearch) searchMetricName(dst []byte, metricID uint64, accountID,
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToMetricName, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToMetricName)
kb.B = encoding.MarshalUint64(kb.B, metricID)
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
if err == io.EOF {
@ -1512,13 +1521,13 @@ func mergeTSIDs(a, b []TSID) []TSID {
return tsids
}
func (is *indexSearch) containsTimeRange(tr TimeRange, accountID, projectID uint32) (bool, error) {
func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
ts := &is.ts
kb := &is.kb
// Verify whether the maximum date in `ts` covers tr.MinTimestamp.
minDate := uint64(tr.MinTimestamp) / msecPerDay
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
prefix := kb.B
kb.B = encoding.MarshalUint64(kb.B, minDate)
ts.Seek(kb.B)
@ -1536,9 +1545,7 @@ func (is *indexSearch) containsTimeRange(tr TimeRange, accountID, projectID uint
}
func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
accountID := tfss[0].accountID
projectID := tfss[0].projectID
ok, err := is.containsTimeRange(tr, accountID, projectID)
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, err
}
@ -1576,7 +1583,7 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
if err != io.EOF {
return nil, err
}
if err := is.getTSIDByMetricID(&tsids[i], metricID, accountID, projectID); err != nil {
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
if err == io.EOF {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
@ -1596,12 +1603,12 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
return tsids, nil
}
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64, accountID, projectID uint32) error {
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
// There is no need in checking for deleted metricIDs here, since they
// must be checked by the caller.
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID)
kb.B = encoding.MarshalUint64(kb.B, metricID)
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
if err == io.EOF {
@ -1622,7 +1629,7 @@ func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64, accountID,
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
// and adds matching metrics to metricIDs.
func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter, accountID, projectID uint32) error {
func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter) error {
// sort srcMetricIDs in order to speed up Seek below.
sortedMetricIDs := srcMetricIDs.AppendTo(nil)
@ -1637,7 +1644,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
}
}
var err error
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID, accountID, projectID)
metricName.B, err = is.searchMetricName(metricName.B[:0], metricID)
if err != nil {
if err == io.EOF {
// It is likely the metricID->metricName entry didn't propagate to inverted index yet.
@ -1684,7 +1691,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
// Allow fetching up to 20*maxMetrics metrics for the given time range
// in the hope these metricIDs will be filtered out by other filters later.
maxTimeRangeMetrics := 20 * maxMetrics
metricIDsForTimeRange, err := is.getMetricIDsForTimeRange(tr, maxTimeRangeMetrics+1, tfs.accountID, tfs.projectID)
metricIDsForTimeRange, err := is.getMetricIDsForTimeRange(tr, maxTimeRangeMetrics+1)
if err == errMissingMetricIDsForDate {
// Slow path: try to find the tag filter without maxMetrics adjustement.
minTf, minMetricIDs, err = is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
@ -1801,7 +1808,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
kb.B = tf.Marshal(kb.B, tfs.accountID, tfs.projectID)
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
// Skip useless work below, since the tf matches at least maxMetrics metrics.
uselessTagFilters++
@ -1814,7 +1821,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
// Skip tag filters requiring to scan for too many metrics.
kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
kb.B = tf.Marshal(kb.B, tfs.accountID, tfs.projectID)
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
uselessTagFilters++
continue
@ -1825,7 +1832,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
// The tf matches at least maxMetrics. Skip it
kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
kb.B = tf.Marshal(kb.B, tfs.accountID, tfs.projectID)
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
uselessTagFilters++
continue
@ -1857,7 +1864,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
return nil, nil, errTooManyMetrics
}
metricIDs := &uint64set.Set{}
if err := is.updateMetricIDsAll(metricIDs, tfs.accountID, tfs.projectID, maxMetrics); err != nil {
if err := is.updateMetricIDsAll(metricIDs, maxMetrics); err != nil {
return nil, nil, err
}
if metricIDs.Len() >= maxMetrics {
@ -1965,7 +1972,7 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr
for _, tfs := range tfss {
if len(tfs.tfs) == 0 {
// Return all the metric ids
if err := is.updateMetricIDsAll(metricIDs, tfs.accountID, tfs.projectID, maxMetrics+1); err != nil {
if err := is.updateMetricIDsAll(metricIDs, maxMetrics+1); err != nil {
return nil, err
}
if metricIDs.Len() > maxMetrics {
@ -2032,7 +2039,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
if tf == minTf {
continue
}
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs, tfs.accountID, tfs.projectID)
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs)
if err == errFallbackToMetricNameMatch {
// The tag filter requires too many index scans. Postpone it,
// so tag filters with lower number of index scans may be applied.
@ -2046,12 +2053,12 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
successfulIntersects++
}
if len(tfsPostponed) > 0 && successfulIntersects == 0 {
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed, tfs.accountID, tfs.projectID)
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed)
}
for i, tf := range tfsPostponed {
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs, tfs.accountID, tfs.projectID)
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs)
if err == errFallbackToMetricNameMatch {
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed[i:], tfs.accountID, tfs.projectID)
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed[i:])
}
if err != nil {
return err
@ -2361,9 +2368,9 @@ var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMet
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, error) {
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1)
metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics, accountID, projectID)
metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics)
if ok {
// Fast path: tr covers the current and / or the previous hour.
// Return the full list of metric ids for this time range.
@ -2381,7 +2388,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
}
if minDate == maxDate {
// Fast path - query on a single day.
metricIDs, err := is.getMetricIDsForDate(minDate, accountID, projectID, maxMetrics)
metricIDs, err := is.getMetricIDsForDate(minDate, maxMetrics)
if err != nil {
return nil, err
}
@ -2398,9 +2405,9 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int, ac
wg.Add(1)
go func(date uint64) {
defer wg.Done()
isLocal := is.db.getIndexSearch(is.deadline)
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
defer is.db.putIndexSearch(isLocal)
m, err := isLocal.getMetricIDsForDate(date, accountID, projectID, maxMetrics)
m, err := isLocal.getMetricIDsForDate(date, maxMetrics)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
@ -2455,7 +2462,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
wg.Add(1)
go func(date uint64) {
defer wg.Done()
isLocal := is.db.getIndexSearch(is.deadline)
isLocal := is.db.getIndexSearch(is.accountID, is.projectID, is.deadline)
defer is.db.putIndexSearch(isLocal)
m, err := isLocal.getMetricIDsForDateAndFilters(date, tfs, maxMetrics)
mu.Lock()
@ -2500,7 +2507,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
var buf []byte
for i := range tfs.tfs {
tf := &tfs.tfs[i]
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, tfs.accountID, tfs.projectID)
kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, is.accountID, is.projectID)
buf = is.db.metricIDsPerDateTagFilterCache.Get(buf[:0], kb.B)
count := uint64(0)
if len(buf) == 8 {
@ -2530,7 +2537,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i])
continue
}
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, tfs.accountID, tfs.projectID, maxDateMetrics)
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
if err != nil {
return nil, err
}
@ -2551,7 +2558,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// All the filters in tfs are negative or match too many time series.
// Populate all the metricIDs for the given (date),
// so later they can be filtered out with negative filters.
m, err := is.getMetricIDsForDate(date, tfs.accountID, tfs.projectID, maxDateMetrics)
m, err := is.getMetricIDsForDate(date, maxDateMetrics)
if err != nil {
if err == errMissingMetricIDsForDate {
// Zero time series were written on the given date.
@ -2581,7 +2588,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
break
}
tf := tfWithCount.tf
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, tfs.accountID, tfs.projectID, maxDateMetrics)
m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics)
if err != nil {
return nil, err
}
@ -2608,7 +2615,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
// Apply the postponed filters via metricName match.
var m uint64set.Set
if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed, tfs.accountID, tfs.projectID); err != nil {
if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed); err != nil {
return nil, err
}
return &m, nil
@ -2616,12 +2623,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
return metricIDs, nil
}
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, accountID, projectID uint32) (*uint64set.Set, bool) {
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
// Return all the metricIDs for all the (AccountID, ProjectID) entries.
// The caller is responsible for proper filtering later.
k := accountProjectKey{
AccountID: accountID,
ProjectID: projectID,
AccountID: is.accountID,
ProjectID: is.projectID,
}
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
@ -2662,11 +2669,11 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
return nil, false
}
func (is *indexSearch) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error {
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
items := getIndexItems()
defer putIndexItems(items)
items.B = marshalCommonPrefix(items.B, nsPrefixDateToMetricID, accountID, projectID)
items.B = is.marshalCommonPrefix(items.B, nsPrefixDateToMetricID)
items.B = encoding.MarshalUint64(items.B, date)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
@ -2680,7 +2687,7 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64, accountID, proje
// There is no need in searching for metric name in is.db.extDB,
// Since the storeDateMetricID function is called only after the metricID->metricName
// is added into the current is.db.
kb.B, err = is.searchMetricName(kb.B[:0], metricID, accountID, projectID)
kb.B, err = is.searchMetricName(kb.B[:0], metricID)
if err != nil {
if err == io.EOF {
logger.Errorf("missing metricName by metricID %d; this could be the case after unclean shutdown; "+
@ -2695,7 +2702,7 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64, accountID, proje
if err = mn.Unmarshal(kb.B); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q obtained by metricID %d: %w", metricID, kb.B, err)
}
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
items.B = append(items.B, kb.B...)
@ -2749,10 +2756,10 @@ func reverseBytes(dst, src []byte) []byte {
return dst
}
func (is *indexSearch) hasDateMetricID(date, metricID uint64, accountID, projectID uint32) (bool, error) {
func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = encoding.MarshalUint64(kb.B, metricID)
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
@ -2767,14 +2774,14 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64, accountID, project
return true, nil
}
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, accountID, projectID uint32, maxMetrics int) (*uint64set.Set, error) {
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
// Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
}
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...)
@ -2785,7 +2792,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
// Store the number of matching metricIDs in the cache in order to sort tag filters
// in ascending number of matching metricIDs on the next search.
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, accountID, projectID)
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
metricIDsLen := uint64(metricIDs.Len())
if err != nil {
// Set metricIDsLen to maxMetrics, so the given entry will be moved to the end
@ -2803,11 +2810,11 @@ func appendDateTagFilterCacheKey(dst []byte, date uint64, tf *tagFilter, account
return dst
}
func (is *indexSearch) getMetricIDsForDate(date uint64, accountID, projectID uint32, maxMetrics int) (*uint64set.Set, error) {
func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64set.Set, error) {
// Extract all the metricIDs from (date, __name__=value)->metricIDs entries.
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = marshalTagValue(kb.B, nil)
var metricIDs uint64set.Set
@ -2822,11 +2829,11 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, accountID, projectID uin
return &metricIDs, nil
}
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, accountID, projectID uint32, maxMetrics int) error {
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {
kb := kbPool.Get()
defer kbPool.Put(kb)
// Extract all the metricIDs from (__name__=value)->metricIDs entries.
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, accountID, projectID)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, nil)
return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics)
}
@ -2878,7 +2885,7 @@ const maxIndexScanLoopsPerMetric = 100
// over the found metrics.
const maxIndexScanSlowLoopsPerMetric = 20
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set, accountID, projectID uint32) (*uint64set.Set, error) {
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
if filter.Len() == 0 {
return nil, nil
}
@ -2886,7 +2893,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
filterLenRounded := (uint64(filter.Len()) / 1024) * 1024
kb.B = append(kb.B[:0], uselessTagIntersectKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, filterLenRounded)
kb.B = tf.Marshal(kb.B, accountID, projectID)
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
// Skip useless work, since the intersection will return
// errFallbackToMetricNameMatc for the given filter.
@ -2901,7 +2908,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
}
kb.B = append(kb.B[:0], uselessTagIntersectKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, filterLenRounded)
kb.B = tf.Marshal(kb.B, accountID, projectID)
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
return nil, errFallbackToMetricNameMatch
}
@ -2966,6 +2973,10 @@ func marshalCommonPrefix(dst []byte, nsPrefix byte, accountID, projectID uint32)
return dst
}
func (is *indexSearch) marshalCommonPrefix(dst []byte, nsPrefix byte) []byte {
return marshalCommonPrefix(dst, nsPrefix, is.accountID, is.projectID)
}
func unmarshalCommonPrefix(src []byte) ([]byte, byte, uint32, uint32, error) {
if len(src) < commonPrefixLen {
return nil, 0, 0, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)

View file

@ -629,7 +629,7 @@ func testIndexDBBigMetricName(db *indexDB) error {
var mn MetricName
var tsid TSID
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
// Try creating too big metric group
@ -690,7 +690,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
var mns []MetricName
var tsids []TSID
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
var metricNameBuf []byte
@ -732,7 +732,9 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
if err := is.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil {
is.accountID = tsid.AccountID
is.projectID = tsid.ProjectID
if err := is.storeDateMetricID(date, tsid.MetricID); err != nil {
return nil, nil, fmt.Errorf("error in storeDateMetricID(%d, %d, %d, %d): %w", date, tsid.MetricID, tsid.AccountID, tsid.ProjectID, err)
}
}
@ -1570,12 +1572,11 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
}()
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
// Create a bunch of per-day time series
const accountID = 12345
const projectID = 85453
is := db.getIndexSearch(accountID, projectID, noDeadline)
defer db.putIndexSearch(is)
const days = 5
const metricsPerDay = 1000
theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC)
@ -1623,7 +1624,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
date := baseDate - uint64(day*msecPerDay)
for i := range tsids {
tsid := &tsids[i]
if err := is.storeDateMetricID(date, tsid.MetricID, tsid.AccountID, tsid.ProjectID); err != nil {
if err := is.storeDateMetricID(date, tsid.MetricID); err != nil {
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
}
}

View file

@ -98,7 +98,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
var metricName []byte
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsPerLoop; i++ {
mn.MetricGroup = strconv.AppendUint(mn.MetricGroup[:0], uint64(i+startOffset), 10)
@ -177,7 +177,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
b.ResetTimer()
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(tfs.accountID, tfs.projectID, noDeadline)
defer db.putIndexSearch(is)
tfss := []*TagFilters{tfs}
tr := TimeRange{
@ -344,7 +344,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
var tsid TSID
var metricName []byte
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsCount; i++ {
mn.AccountID = uint32(i % accountsCount)
@ -363,7 +363,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
var tsidLocal TSID
var metricNameLocal []byte
mnLocal := mn
is := db.getIndexSearch(noDeadline)
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
for pb.Next() {
for i := 0; i < recordsPerLoop; i++ {

View file

@ -907,7 +907,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
var metricName []byte
var err error
idb := s.idb()
is := idb.getIndexSearch(deadline)
is := idb.getIndexSearch(accountID, projectID, deadline)
defer idb.putIndexSearch(is)
for loops, metricID := range metricIDs {
if loops&(1<<10) == 0 {
@ -915,7 +915,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
return err
}
}
metricName, err = is.searchMetricName(metricName[:0], metricID, accountID, projectID)
metricName, err = is.searchMetricName(metricName[:0], metricID)
if err != nil && err != io.EOF {
return fmt.Errorf("error in pre-fetching metricName for metricID=%d: %w", metricID, err)
}
@ -1210,7 +1210,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
})
is := idb.getIndexSearch(noDeadline)
is := idb.getIndexSearch(0, 0, noDeadline)
prevMetricNameRaw = nil
for i := range pendingMetricRows {
pmr := &pendingMetricRows[i]
@ -1435,7 +1435,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
return a.metricID < b.metricID
})
idb := s.idb()
is := idb.getIndexSearch(noDeadline)
is := idb.getIndexSearch(0, 0, noDeadline)
defer idb.putIndexSearch(is)
var firstError error
prevMetricID = 0
@ -1443,8 +1443,6 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
for _, dateMetricID := range pendingDateMetricIDs {
date := dateMetricID.date
metricID := dateMetricID.metricID
accountID := dateMetricID.accountID
projectID := dateMetricID.projectID
if metricID == prevMetricID && date == prevDate {
// Fast path for bulk import of multiple rows with the same (date, metricID) pairs.
continue
@ -1456,17 +1454,19 @@ func (s *Storage) updatePerDateData(rows []rawRow) error {
// The metricID has been already added to per-day inverted index.
continue
}
ok, err := is.hasDateMetricID(date, metricID, accountID, projectID)
is.accountID = dateMetricID.accountID
is.projectID = dateMetricID.projectID
ok, err := is.hasDateMetricID(date, metricID)
if err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when locating (date=%d, metricID=%d, accountID=%d, projectID=%d) in database: %w",
date, metricID, accountID, projectID, err)
date, metricID, is.accountID, is.projectID, err)
}
continue
}
if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
if err := is.storeDateMetricID(date, metricID, accountID, projectID); err != nil {
if err := is.storeDateMetricID(date, metricID); err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err)
}