mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: code cleanup after 10f2eedee0
Remove the code that uses metricIDs caches for the current and the previous hour during metricIDs search, since this code became unused after implementing per-day inverted index almost a year ago. While at it, fix a bug, which could prevent from finding time series with names containing dots (aka Graphite-like names such as `foo.bar.baz`).
This commit is contained in:
parent
10f2eedee0
commit
764dc2499f
5 changed files with 54 additions and 215 deletions
|
@ -372,12 +372,6 @@ func registerStorageMetrics() {
|
|||
metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 {
|
||||
return float64(idbm().MissingTSIDsForMetricID)
|
||||
})
|
||||
metrics.NewGauge(`vm_recent_hour_metric_ids_search_calls_total`, func() float64 {
|
||||
return float64(idbm().RecentHourMetricIDsSearchCalls)
|
||||
})
|
||||
metrics.NewGauge(`vm_recent_hour_metric_ids_search_hits_total`, func() float64 {
|
||||
return float64(idbm().RecentHourMetricIDsSearchHits)
|
||||
})
|
||||
metrics.NewGauge(`vm_date_metric_ids_search_calls_total`, func() float64 {
|
||||
return float64(idbm().DateMetricIDsSearchCalls)
|
||||
})
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
|
@ -85,12 +84,6 @@ type indexDB struct {
|
|||
// High rate for this value means corrupted indexDB.
|
||||
missingTSIDsForMetricID uint64
|
||||
|
||||
// The number of calls to search for metric ids for recent hours.
|
||||
recentHourMetricIDsSearchCalls uint64
|
||||
|
||||
// The number of cache hits during search for metric ids in recent hours.
|
||||
recentHourMetricIDsSearchHits uint64
|
||||
|
||||
// The number of searches for metric ids by days.
|
||||
dateMetricIDsSearchCalls uint64
|
||||
|
||||
|
@ -149,16 +142,10 @@ type indexDB struct {
|
|||
// metricIDs, since it usually requires 1 bit per deleted metricID.
|
||||
deletedMetricIDs atomic.Value
|
||||
deletedMetricIDsUpdateLock sync.Mutex
|
||||
|
||||
// Global lists of metric ids for the current and the previous hours.
|
||||
// They are used for fast lookups on small time ranges covering
|
||||
// up to two last hours.
|
||||
currHourMetricIDs *atomic.Value
|
||||
prevHourMetricIDs *atomic.Value
|
||||
}
|
||||
|
||||
// openIndexDB opens index db from the given path with the given caches.
|
||||
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
|
||||
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (*indexDB, error) {
|
||||
if metricIDCache == nil {
|
||||
logger.Panicf("BUG: metricIDCache must be non-nil")
|
||||
}
|
||||
|
@ -168,12 +155,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
|
|||
if tsidCache == nil {
|
||||
logger.Panicf("BUG: tsidCache must be nin-nil")
|
||||
}
|
||||
if currHourMetricIDs == nil {
|
||||
logger.Panicf("BUG: currHourMetricIDs must be non-nil")
|
||||
}
|
||||
if prevHourMetricIDs == nil {
|
||||
logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
|
||||
}
|
||||
|
||||
tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows)
|
||||
if err != nil {
|
||||
|
@ -196,9 +177,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
|
|||
tsidCache: tsidCache,
|
||||
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
|
||||
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
|
||||
|
||||
currHourMetricIDs: currHourMetricIDs,
|
||||
prevHourMetricIDs: prevHourMetricIDs,
|
||||
}
|
||||
|
||||
is := db.getIndexSearch(noDeadline)
|
||||
|
@ -284,8 +262,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
|||
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
|
||||
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
|
||||
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
|
||||
m.RecentHourMetricIDsSearchCalls += atomic.LoadUint64(&db.recentHourMetricIDsSearchCalls)
|
||||
m.RecentHourMetricIDsSearchHits += atomic.LoadUint64(&db.recentHourMetricIDsSearchHits)
|
||||
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
|
||||
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
|
||||
|
||||
|
@ -457,7 +433,6 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
|
|||
// Round start and end times to per-day granularity according to per-day inverted index.
|
||||
startDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||
endDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||
dst = append(dst, tagFiltersKeyVersion)
|
||||
dst = encoding.MarshalUint64(dst, prefix)
|
||||
dst = encoding.MarshalUint64(dst, startDate)
|
||||
dst = encoding.MarshalUint64(dst, endDate)
|
||||
|
@ -470,10 +445,6 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
|
|||
return dst
|
||||
}
|
||||
|
||||
// The version for tagCache key.
|
||||
// Update it every time key generation scheme changes at marshalTagFiltersKey.
|
||||
const tagFiltersKeyVersion = 1
|
||||
|
||||
func invalidateTagCache() {
|
||||
// This function must be fast, since it is called each
|
||||
// time new timeseries is added.
|
||||
|
@ -1436,7 +1407,6 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
|
|||
}
|
||||
|
||||
func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
|
||||
minDate := fasttime.UnixDate()
|
||||
kb := &is.kb
|
||||
ts := &is.ts
|
||||
kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||
|
@ -1446,7 +1416,8 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
|
|||
item := ts.Item
|
||||
if !bytes.HasPrefix(item, prefix) {
|
||||
// The databse doesn't contain per-day inverted index yet.
|
||||
return minDate, nil
|
||||
// Return the minimum possible date, i.e. 0.
|
||||
return 0, nil
|
||||
}
|
||||
suffix := item[len(prefix):]
|
||||
|
||||
|
@ -1454,14 +1425,15 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
|
|||
if len(suffix) < 8 {
|
||||
return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 8 bytes; got %d bytes", len(suffix))
|
||||
}
|
||||
minDate = encoding.UnmarshalUint64(suffix)
|
||||
minDate := encoding.UnmarshalUint64(suffix)
|
||||
return minDate, nil
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// There are no (date,tag)->metricIDs entries in the database yet.
|
||||
return minDate, nil
|
||||
// Return the minimum possible date, i.e. 0.
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
|
||||
|
@ -1788,9 +1760,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
|
|||
}
|
||||
|
||||
func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
|
||||
// Try fast path with the minimized number of maxMetrics.
|
||||
maxMetricsAdjusted := is.adjustMaxMetricsAdaptive(tr, maxMetrics)
|
||||
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetricsAdjusted)
|
||||
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
|
||||
if err == nil {
|
||||
return minTf, minMetricIDs, nil
|
||||
}
|
||||
|
@ -1840,29 +1810,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
|
|||
maxMetrics, tr.String())
|
||||
}
|
||||
|
||||
const maxDaysForDateMetricIDs = 40
|
||||
|
||||
func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) int {
|
||||
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||
if maxDate-minDate > maxDaysForDateMetricIDs {
|
||||
// Cannot reduce maxMetrics for the given time range,
|
||||
// since it is expensive extracting metricIDs for the given tr.
|
||||
return maxMetrics
|
||||
}
|
||||
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if !hmPrev.isFull {
|
||||
return maxMetrics
|
||||
}
|
||||
hourMetrics := hmPrev.m.Len()
|
||||
if maxMetrics > hourMetrics {
|
||||
// It is cheaper to filter on the hour or day metrics if the minimum
|
||||
// number of matching metrics across tfs exceeds hourMetrics.
|
||||
return hourMetrics
|
||||
}
|
||||
return maxMetrics
|
||||
}
|
||||
|
||||
func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
|
||||
kb := &is.kb
|
||||
kb.B = append(kb.B[:0], uselessMultiTagFiltersKeyPrefix)
|
||||
|
@ -2016,6 +1963,11 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
|
|||
}
|
||||
continue
|
||||
}
|
||||
if bytes.Equal(tf.key, graphiteReverseTagKey) {
|
||||
// Skip artificial tag filter for Graphite-like metric names with dots,
|
||||
// since mn doesn't contain the corresponding tag.
|
||||
continue
|
||||
}
|
||||
|
||||
// Search for matching tag name.
|
||||
tagMatched := false
|
||||
|
@ -2488,16 +2440,6 @@ var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMet
|
|||
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
|
||||
|
||||
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
|
||||
atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1)
|
||||
metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics)
|
||||
if ok {
|
||||
// Fast path: tr covers the current and / or the previous hour.
|
||||
// Return the full list of metric ids for this time range.
|
||||
atomic.AddUint64(&is.db.recentHourMetricIDsSearchHits, 1)
|
||||
return metricIDs, nil
|
||||
}
|
||||
|
||||
// Slow path: collect the metric ids for all the days covering the given tr.
|
||||
atomic.AddUint64(&is.db.dateMetricIDsSearchCalls, 1)
|
||||
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
|
||||
|
@ -2516,7 +2458,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
|
|||
}
|
||||
|
||||
// Slower path - query over multiple days in parallel.
|
||||
metricIDs = &uint64set.Set{}
|
||||
metricIDs := &uint64set.Set{}
|
||||
var wg sync.WaitGroup
|
||||
var errGlobal error
|
||||
var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below.
|
||||
|
@ -2550,6 +2492,8 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
|
|||
return metricIDs, nil
|
||||
}
|
||||
|
||||
const maxDaysForDateMetricIDs = 40
|
||||
|
||||
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
|
||||
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
|
||||
minDate := uint64(tr.MinTimestamp) / msecPerDay
|
||||
|
@ -2693,6 +2637,10 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
}
|
||||
metricIDs = m
|
||||
}
|
||||
if metricIDs.Len() == 0 {
|
||||
// There is no sense in inspecting tfsRemainingWithCount, since the result will be empty.
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Intersect metricIDs with the rest of filters.
|
||||
for i := range tfsRemainingWithCount {
|
||||
|
@ -2742,41 +2690,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
return metricIDs, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
|
||||
minHour := uint64(tr.MinTimestamp) / msecPerHour
|
||||
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
|
||||
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
|
||||
// The tr fits the current hour.
|
||||
// Return a copy of hmCurr.m, because the caller may modify
|
||||
// the returned map.
|
||||
if hmCurr.m.Len() > maxMetrics {
|
||||
return nil, false
|
||||
}
|
||||
return hmCurr.m.Clone(), true
|
||||
}
|
||||
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
|
||||
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
|
||||
// The tr fits the previous hour.
|
||||
// Return a copy of hmPrev.m, because the caller may modify
|
||||
// the returned map.
|
||||
if hmPrev.m.Len() > maxMetrics {
|
||||
return nil, false
|
||||
}
|
||||
return hmPrev.m.Clone(), true
|
||||
}
|
||||
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
|
||||
// The tr spans the previous and the current hours.
|
||||
if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics {
|
||||
return nil, false
|
||||
}
|
||||
metricIDs := hmCurr.m.Clone()
|
||||
metricIDs.Union(hmPrev.m)
|
||||
return metricIDs, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
||||
items := getIndexItems()
|
||||
defer putIndexItems(items)
|
||||
|
|
|
@ -8,13 +8,11 @@ import (
|
|||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||
)
|
||||
|
||||
|
@ -455,13 +453,8 @@ func TestIndexDBOpenClose(t *testing.T) {
|
|||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
var hmPrev atomic.Value
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -483,13 +476,8 @@ func TestIndexDB(t *testing.T) {
|
|||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
var hmPrev atomic.Value
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
dbName := "test-index-db-serial"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -519,7 +507,7 @@ func TestIndexDB(t *testing.T) {
|
|||
|
||||
// Re-open the db and verify it works as expected.
|
||||
db.MustClose()
|
||||
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -542,13 +530,8 @@ func TestIndexDB(t *testing.T) {
|
|||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
var hmPrev atomic.Value
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
dbName := "test-index-db-concurrent"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -821,6 +804,11 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
}
|
||||
|
||||
// Try tag filters.
|
||||
currentTime := timestampFromTime(time.Now())
|
||||
tr := TimeRange{
|
||||
MinTimestamp: currentTime - msecPerDay,
|
||||
MaxTimestamp: currentTime + msecPerDay,
|
||||
}
|
||||
for i := range mns {
|
||||
mn := &mns[i]
|
||||
tsid := &tsids[i]
|
||||
|
@ -842,7 +830,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if err := tfs.Add(nil, nil, true, false); err != nil {
|
||||
return fmt.Errorf("cannot add no-op negative filter: %w", err)
|
||||
}
|
||||
tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search by exact tag filter: %w", err)
|
||||
}
|
||||
|
@ -851,7 +839,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
}
|
||||
|
||||
// Verify tag cache.
|
||||
tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search by exact tag filter: %w", err)
|
||||
}
|
||||
|
@ -863,7 +851,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
|
||||
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
|
||||
}
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
|
||||
}
|
||||
|
@ -884,7 +872,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if tfsNew := tfs.Finalize(); len(tfsNew) > 0 {
|
||||
return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew)
|
||||
}
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
|
||||
}
|
||||
|
@ -912,7 +900,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if err := tfs.Add(nil, nil, true, true); err != nil {
|
||||
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
|
||||
}
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search by regexp tag filter: %w", err)
|
||||
}
|
||||
|
@ -922,7 +910,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
|
||||
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
|
||||
}
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
|
||||
}
|
||||
|
@ -938,7 +926,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
|
||||
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
|
||||
}
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
|
||||
}
|
||||
|
@ -954,7 +942,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
|
||||
// Search with empty filter. It should match all the results.
|
||||
tfs.Reset()
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search for common prefix: %w", err)
|
||||
}
|
||||
|
@ -967,7 +955,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if err := tfs.Add(nil, nil, false, false); err != nil {
|
||||
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
|
||||
}
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
|
||||
}
|
||||
|
@ -984,7 +972,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
|
||||
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
|
||||
}
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
|
||||
}
|
||||
|
@ -993,7 +981,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
|||
}
|
||||
|
||||
// Verify empty tfss
|
||||
tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5, noDeadline)
|
||||
tsidsFound, err = db.searchTSIDs(nil, tr, 1e5, noDeadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot search for nil tfss: %w", err)
|
||||
}
|
||||
|
@ -1474,23 +1462,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
currMetricIDs := &hourMetricIDs{
|
||||
isFull: true,
|
||||
m: &uint64set.Set{},
|
||||
}
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(currMetricIDs)
|
||||
|
||||
prevMetricIDs := &hourMetricIDs{
|
||||
isFull: true,
|
||||
m: &uint64set.Set{},
|
||||
}
|
||||
var hmPrev atomic.Value
|
||||
hmPrev.Store(prevMetricIDs)
|
||||
|
||||
dbName := "test-index-db-ts-range"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -1509,8 +1482,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
const metricsPerDay = 1000
|
||||
theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC)
|
||||
now := uint64(timestampFromTime(theDay))
|
||||
currMetricIDs.hour = now / msecPerHour
|
||||
prevMetricIDs.hour = (now - msecPerHour) / msecPerHour
|
||||
baseDate := now / msecPerDay
|
||||
var metricNameBuf []byte
|
||||
for day := 0; day < days; day++ {
|
||||
|
@ -1541,21 +1512,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
}
|
||||
|
||||
// Add the metrics to the per-day stores
|
||||
date := baseDate - uint64(day*msecPerDay)
|
||||
date := baseDate - uint64(day)
|
||||
for i := range tsids {
|
||||
tsid := &tsids[i]
|
||||
if err := is.storeDateMetricID(date, tsid.MetricID); err != nil {
|
||||
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add the the hour metrics caches
|
||||
if day == 0 {
|
||||
for i := 0; i < 256; i++ {
|
||||
prevMetricIDs.m.Add(tsids[i].MetricID)
|
||||
currMetricIDs.m.Add(tsids[i].MetricID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush index to disk, so it becomes visible for search
|
||||
|
@ -1567,32 +1530,18 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
t.Fatalf("cannot add filter: %s", err)
|
||||
}
|
||||
|
||||
// Perform a search that can be fulfilled out of the hour metrics cache.
|
||||
// This should return the metrics in the hourly cache
|
||||
// Perform a search within a day.
|
||||
// This should return the metrics for the day
|
||||
tr := TimeRange{
|
||||
MinTimestamp: int64(now - msecPerHour + 1),
|
||||
MinTimestamp: int64(now - 2*msecPerHour - 1),
|
||||
MaxTimestamp: int64(now),
|
||||
}
|
||||
matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error searching tsids: %v", err)
|
||||
}
|
||||
if len(matchedTSIDs) != 256 {
|
||||
t.Fatal("Expected time series for current hour, got", len(matchedTSIDs))
|
||||
}
|
||||
|
||||
// Perform a search within a day that falls out out of the hour metrics cache.
|
||||
// This should return the metrics for the day
|
||||
tr = TimeRange{
|
||||
MinTimestamp: int64(now - 2*msecPerHour - 1),
|
||||
MaxTimestamp: int64(now),
|
||||
}
|
||||
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
|
||||
if err != nil {
|
||||
t.Fatalf("error searching tsids: %v", err)
|
||||
}
|
||||
if len(matchedTSIDs) != metricsPerDay {
|
||||
t.Fatal("Expected time series for current day, got", len(matchedTSIDs))
|
||||
t.Fatalf("expected %d time series for current day, got %d time series", metricsPerDay, len(matchedTSIDs))
|
||||
}
|
||||
|
||||
// Perform a search across all the days, should match all metrics
|
||||
|
@ -1606,7 +1555,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
t.Fatalf("error searching tsids: %v", err)
|
||||
}
|
||||
if len(matchedTSIDs) != metricsPerDay*days {
|
||||
t.Fatal("Expected time series for all days, got", len(matchedTSIDs))
|
||||
t.Fatalf("expected %d time series for all days, got %d time series", metricsPerDay*days, len(matchedTSIDs))
|
||||
}
|
||||
|
||||
// Check GetTSDBStatusForDate
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -50,13 +49,8 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
|||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
var hmPrev atomic.Value
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
const dbName = "bench-index-db-add-tsids"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -120,13 +114,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
var hmPrev atomic.Value
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
const dbName = "bench-head-posting-for-matchers"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
@ -304,13 +293,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
|||
defer metricNameCache.Stop()
|
||||
defer tsidCache.Stop()
|
||||
|
||||
var hmCurr atomic.Value
|
||||
hmCurr.Store(&hourMetricIDs{})
|
||||
var hmPrev atomic.Value
|
||||
hmPrev.Store(&hourMetricIDs{})
|
||||
|
||||
const dbName = "bench-index-db-get-tsids"
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
|
||||
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
b.Fatalf("cannot open indexDB: %s", err)
|
||||
}
|
||||
|
|
|
@ -169,7 +169,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
|
|||
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
|
||||
}
|
||||
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
|
||||
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
|
||||
}
|
||||
|
@ -534,7 +534,7 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
// Create new indexdb table.
|
||||
newTableName := nextIndexDBTableName()
|
||||
idbNewPath := s.path + "/indexdb/" + newTableName
|
||||
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
|
||||
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache)
|
||||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
|
||||
}
|
||||
|
@ -1664,8 +1664,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
|
|||
s.tsidCache.Set(metricName, buf)
|
||||
}
|
||||
|
||||
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache,
|
||||
currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) {
|
||||
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (curr, prev *indexDB, err error) {
|
||||
if err := fs.MkdirAllIfNotExist(path); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
|
||||
}
|
||||
|
@ -1724,12 +1723,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w
|
|||
// Open the last two tables.
|
||||
currPath := path + "/" + tableNames[len(tableNames)-1]
|
||||
|
||||
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs)
|
||||
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
|
||||
}
|
||||
prevPath := path + "/" + tableNames[len(tableNames)-2]
|
||||
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs)
|
||||
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache)
|
||||
if err != nil {
|
||||
curr.MustClose()
|
||||
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)
|
||||
|
|
Loading…
Reference in a new issue