From 10f2eedee08580670f09744d59d7cc904126f1bf Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 1 Oct 2020 14:35:49 +0300 Subject: [PATCH 1/2] lib/storage: imrpove cache effectiveness for time series ids matching the given filters Previously the maximum cache lifetime has been limited by 10 seconds. Now it is extended up to a day. This should reduce CPU usage in the following cases: * when querying recently added data with small churn rate for time series * when querying historical data --- lib/storage/index_db.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 0e6db3fd62..1bdb704d59 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -454,12 +454,13 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione if versioned { prefix = atomic.LoadUint64(&tagFiltersKeyGen) } - const cacheGranularityMs = 1000 * 10 - startTime := (uint64(tr.MinTimestamp) / cacheGranularityMs) * cacheGranularityMs - endTime := (uint64(tr.MaxTimestamp) / cacheGranularityMs) * cacheGranularityMs + // Round start and end times to per-day granularity according to per-day inverted index. + startDate := uint64(tr.MinTimestamp) / msecPerDay + endDate := uint64(tr.MaxTimestamp) / msecPerDay + dst = append(dst, tagFiltersKeyVersion) dst = encoding.MarshalUint64(dst, prefix) - dst = encoding.MarshalUint64(dst, startTime) - dst = encoding.MarshalUint64(dst, endTime) + dst = encoding.MarshalUint64(dst, startDate) + dst = encoding.MarshalUint64(dst, endDate) for _, tfs := range tfss { dst = append(dst, 0) // separator between tfs groups. for i := range tfs.tfs { @@ -469,6 +470,18 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione return dst } +// The version for tagCache key. +// Update it every time key generation scheme changes at marshalTagFiltersKey. +const tagFiltersKeyVersion = 1 + +func invalidateTagCache() { + // This function must be fast, since it is called each + // time new timeseries is added. + atomic.AddUint64(&tagFiltersKeyGen, 1) +} + +var tagFiltersKeyGen uint64 + func marshalTSIDs(dst []byte, tsids []TSID) []byte { dst = encoding.MarshalUint64(dst, uint64(len(tsids))) for i := range tsids { @@ -501,14 +514,6 @@ func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) { return dst, nil } -func invalidateTagCache() { - // This function must be fast, since it is called each - // time new timeseries is added. - atomic.AddUint64(&tagFiltersKeyGen, 1) -} - -var tagFiltersKeyGen uint64 - // getTSIDByNameNoCreate fills the dst with TSID for the given metricName. // // It returns io.EOF if the given mn isn't found locally. From 764dc2499fcd4f540d4af24bb9fd906cdce26e6c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 1 Oct 2020 19:03:34 +0300 Subject: [PATCH 2/2] lib/storage: code cleanup after 10f2eedee08580670f09744d59d7cc904126f1bf Remove the code that uses metricIDs caches for the current and the previous hour during metricIDs search, since this code became unused after implementing per-day inverted index almost a year ago. While at it, fix a bug, which could prevent from finding time series with names containing dots (aka Graphite-like names such as `foo.bar.baz`). --- app/vmstorage/main.go | 6 -- lib/storage/index_db.go | 125 +++++----------------------- lib/storage/index_db_test.go | 105 ++++++----------------- lib/storage/index_db_timing_test.go | 22 +---- lib/storage/storage.go | 11 ++- 5 files changed, 54 insertions(+), 215 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index d9a6e69ed2..e2ff268b62 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -372,12 +372,6 @@ func registerStorageMetrics() { metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 { return float64(idbm().MissingTSIDsForMetricID) }) - metrics.NewGauge(`vm_recent_hour_metric_ids_search_calls_total`, func() float64 { - return float64(idbm().RecentHourMetricIDsSearchCalls) - }) - metrics.NewGauge(`vm_recent_hour_metric_ids_search_hits_total`, func() float64 { - return float64(idbm().RecentHourMetricIDsSearchHits) - }) metrics.NewGauge(`vm_date_metric_ids_search_calls_total`, func() float64 { return float64(idbm().DateMetricIDsSearchCalls) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 1bdb704d59..fe3c7e7356 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -15,7 +15,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -85,12 +84,6 @@ type indexDB struct { // High rate for this value means corrupted indexDB. missingTSIDsForMetricID uint64 - // The number of calls to search for metric ids for recent hours. - recentHourMetricIDsSearchCalls uint64 - - // The number of cache hits during search for metric ids in recent hours. - recentHourMetricIDsSearchHits uint64 - // The number of searches for metric ids by days. dateMetricIDsSearchCalls uint64 @@ -149,16 +142,10 @@ type indexDB struct { // metricIDs, since it usually requires 1 bit per deleted metricID. deletedMetricIDs atomic.Value deletedMetricIDsUpdateLock sync.Mutex - - // Global lists of metric ids for the current and the previous hours. - // They are used for fast lookups on small time ranges covering - // up to two last hours. - currHourMetricIDs *atomic.Value - prevHourMetricIDs *atomic.Value } // openIndexDB opens index db from the given path with the given caches. -func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { +func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (*indexDB, error) { if metricIDCache == nil { logger.Panicf("BUG: metricIDCache must be non-nil") } @@ -168,12 +155,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working if tsidCache == nil { logger.Panicf("BUG: tsidCache must be nin-nil") } - if currHourMetricIDs == nil { - logger.Panicf("BUG: currHourMetricIDs must be non-nil") - } - if prevHourMetricIDs == nil { - logger.Panicf("BUG: prevHourMetricIDs must be non-nil") - } tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows) if err != nil { @@ -196,9 +177,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working tsidCache: tsidCache, uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), - - currHourMetricIDs: currHourMetricIDs, - prevHourMetricIDs: prevHourMetricIDs, } is := db.getIndexSearch(noDeadline) @@ -284,8 +262,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) - m.RecentHourMetricIDsSearchCalls += atomic.LoadUint64(&db.recentHourMetricIDsSearchCalls) - m.RecentHourMetricIDsSearchHits += atomic.LoadUint64(&db.recentHourMetricIDsSearchHits) m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) @@ -457,7 +433,6 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione // Round start and end times to per-day granularity according to per-day inverted index. startDate := uint64(tr.MinTimestamp) / msecPerDay endDate := uint64(tr.MaxTimestamp) / msecPerDay - dst = append(dst, tagFiltersKeyVersion) dst = encoding.MarshalUint64(dst, prefix) dst = encoding.MarshalUint64(dst, startDate) dst = encoding.MarshalUint64(dst, endDate) @@ -470,10 +445,6 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione return dst } -// The version for tagCache key. -// Update it every time key generation scheme changes at marshalTagFiltersKey. -const tagFiltersKeyVersion = 1 - func invalidateTagCache() { // This function must be fast, since it is called each // time new timeseries is added. @@ -1436,7 +1407,6 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) { } func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) { - minDate := fasttime.UnixDate() kb := &is.kb ts := &is.ts kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs) @@ -1446,7 +1416,8 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) { item := ts.Item if !bytes.HasPrefix(item, prefix) { // The databse doesn't contain per-day inverted index yet. - return minDate, nil + // Return the minimum possible date, i.e. 0. + return 0, nil } suffix := item[len(prefix):] @@ -1454,14 +1425,15 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) { if len(suffix) < 8 { return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 8 bytes; got %d bytes", len(suffix)) } - minDate = encoding.UnmarshalUint64(suffix) + minDate := encoding.UnmarshalUint64(suffix) return minDate, nil } if err := ts.Error(); err != nil { return 0, err } // There are no (date,tag)->metricIDs entries in the database yet. - return minDate, nil + // Return the minimum possible date, i.e. 0. + return 0, nil } func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) { @@ -1788,9 +1760,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs } func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) { - // Try fast path with the minimized number of maxMetrics. - maxMetricsAdjusted := is.adjustMaxMetricsAdaptive(tr, maxMetrics) - minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetricsAdjusted) + minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics) if err == nil { return minTf, minMetricIDs, nil } @@ -1840,29 +1810,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter maxMetrics, tr.String()) } -const maxDaysForDateMetricIDs = 40 - -func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) int { - minDate := uint64(tr.MinTimestamp) / msecPerDay - maxDate := uint64(tr.MaxTimestamp) / msecPerDay - if maxDate-minDate > maxDaysForDateMetricIDs { - // Cannot reduce maxMetrics for the given time range, - // since it is expensive extracting metricIDs for the given tr. - return maxMetrics - } - hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) - if !hmPrev.isFull { - return maxMetrics - } - hourMetrics := hmPrev.m.Len() - if maxMetrics > hourMetrics { - // It is cheaper to filter on the hour or day metrics if the minimum - // number of matching metrics across tfs exceeds hourMetrics. - return hourMetrics - } - return maxMetrics -} - func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) { kb := &is.kb kb.B = append(kb.B[:0], uselessMultiTagFiltersKeyPrefix) @@ -2016,6 +1963,11 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) } continue } + if bytes.Equal(tf.key, graphiteReverseTagKey) { + // Skip artificial tag filter for Graphite-like metric names with dots, + // since mn doesn't contain the corresponding tag. + continue + } // Search for matching tag name. tagMatched := false @@ -2488,16 +2440,6 @@ var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMet var errMissingMetricIDsForDate = errors.New("missing metricIDs for date") func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) { - atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1) - metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics) - if ok { - // Fast path: tr covers the current and / or the previous hour. - // Return the full list of metric ids for this time range. - atomic.AddUint64(&is.db.recentHourMetricIDsSearchHits, 1) - return metricIDs, nil - } - - // Slow path: collect the metric ids for all the days covering the given tr. atomic.AddUint64(&is.db.dateMetricIDsSearchCalls, 1) minDate := uint64(tr.MinTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp) / msecPerDay @@ -2516,7 +2458,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (* } // Slower path - query over multiple days in parallel. - metricIDs = &uint64set.Set{} + metricIDs := &uint64set.Set{} var wg sync.WaitGroup var errGlobal error var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below. @@ -2550,6 +2492,8 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (* return metricIDs, nil } +const maxDaysForDateMetricIDs = 40 + func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { atomic.AddUint64(&is.db.dateRangeSearchCalls, 1) minDate := uint64(tr.MinTimestamp) / msecPerDay @@ -2693,6 +2637,10 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter } metricIDs = m } + if metricIDs.Len() == 0 { + // There is no sense in inspecting tfsRemainingWithCount, since the result will be empty. + return nil, nil + } // Intersect metricIDs with the rest of filters. for i := range tfsRemainingWithCount { @@ -2742,41 +2690,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter return metricIDs, nil } -func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) { - minHour := uint64(tr.MinTimestamp) / msecPerHour - maxHour := uint64(tr.MaxTimestamp) / msecPerHour - hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) - if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull { - // The tr fits the current hour. - // Return a copy of hmCurr.m, because the caller may modify - // the returned map. - if hmCurr.m.Len() > maxMetrics { - return nil, false - } - return hmCurr.m.Clone(), true - } - hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) - if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { - // The tr fits the previous hour. - // Return a copy of hmPrev.m, because the caller may modify - // the returned map. - if hmPrev.m.Len() > maxMetrics { - return nil, false - } - return hmPrev.m.Clone(), true - } - if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { - // The tr spans the previous and the current hours. - if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics { - return nil, false - } - metricIDs := hmCurr.m.Clone() - metricIDs.Union(hmPrev.m) - return metricIDs, true - } - return nil, false -} - func (is *indexSearch) storeDateMetricID(date, metricID uint64) error { items := getIndexItems() defer putIndexItems(items) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index add05b537f..30652d9832 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -8,13 +8,11 @@ import ( "os" "reflect" "regexp" - "sync/atomic" "testing" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) @@ -455,13 +453,8 @@ func TestIndexDBOpenClose(t *testing.T) { defer metricNameCache.Stop() defer tsidCache.Stop() - var hmCurr atomic.Value - hmCurr.Store(&hourMetricIDs{}) - var hmPrev atomic.Value - hmPrev.Store(&hourMetricIDs{}) - for i := 0; i < 5; i++ { - db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -483,13 +476,8 @@ func TestIndexDB(t *testing.T) { defer metricNameCache.Stop() defer tsidCache.Stop() - var hmCurr atomic.Value - hmCurr.Store(&hourMetricIDs{}) - var hmPrev atomic.Value - hmPrev.Store(&hourMetricIDs{}) - dbName := "test-index-db-serial" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -519,7 +507,7 @@ func TestIndexDB(t *testing.T) { // Re-open the db and verify it works as expected. db.MustClose() - db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -542,13 +530,8 @@ func TestIndexDB(t *testing.T) { defer metricNameCache.Stop() defer tsidCache.Stop() - var hmCurr atomic.Value - hmCurr.Store(&hourMetricIDs{}) - var hmPrev atomic.Value - hmPrev.Store(&hourMetricIDs{}) - dbName := "test-index-db-concurrent" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -821,6 +804,11 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Try tag filters. + currentTime := timestampFromTime(time.Now()) + tr := TimeRange{ + MinTimestamp: currentTime - msecPerDay, + MaxTimestamp: currentTime + msecPerDay, + } for i := range mns { mn := &mns[i] tsid := &tsids[i] @@ -842,7 +830,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, true, false); err != nil { return fmt.Errorf("cannot add no-op negative filter: %w", err) } - tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by exact tag filter: %w", err) } @@ -851,7 +839,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Verify tag cache. - tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by exact tag filter: %w", err) } @@ -863,7 +851,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil { return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err) } @@ -884,7 +872,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if tfsNew := tfs.Finalize(); len(tfsNew) > 0 { return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err) } @@ -912,7 +900,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, true, true); err != nil { return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by regexp tag filter: %w", err) } @@ -922,7 +910,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil { return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err) } @@ -938,7 +926,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil { return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search by non-existing tag filter: %w", err) } @@ -954,7 +942,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC // Search with empty filter. It should match all the results. tfs.Reset() - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for common prefix: %w", err) } @@ -967,7 +955,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs.Add(nil, nil, false, false); err != nil { return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for empty metricGroup: %w", err) } @@ -984,7 +972,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil { return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err) } - tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for empty metricGroup: %w", err) } @@ -993,7 +981,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC } // Verify empty tfss - tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5, noDeadline) + tsidsFound, err = db.searchTSIDs(nil, tr, 1e5, noDeadline) if err != nil { return fmt.Errorf("cannot search for nil tfss: %w", err) } @@ -1474,23 +1462,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { defer metricNameCache.Stop() defer tsidCache.Stop() - currMetricIDs := &hourMetricIDs{ - isFull: true, - m: &uint64set.Set{}, - } - - var hmCurr atomic.Value - hmCurr.Store(currMetricIDs) - - prevMetricIDs := &hourMetricIDs{ - isFull: true, - m: &uint64set.Set{}, - } - var hmPrev atomic.Value - hmPrev.Store(prevMetricIDs) - dbName := "test-index-db-ts-range" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -1509,8 +1482,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { const metricsPerDay = 1000 theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC) now := uint64(timestampFromTime(theDay)) - currMetricIDs.hour = now / msecPerHour - prevMetricIDs.hour = (now - msecPerHour) / msecPerHour baseDate := now / msecPerDay var metricNameBuf []byte for day := 0; day < days; day++ { @@ -1541,21 +1512,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } // Add the metrics to the per-day stores - date := baseDate - uint64(day*msecPerDay) + date := baseDate - uint64(day) for i := range tsids { tsid := &tsids[i] if err := is.storeDateMetricID(date, tsid.MetricID); err != nil { t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err) } } - - // Add the the hour metrics caches - if day == 0 { - for i := 0; i < 256; i++ { - prevMetricIDs.m.Add(tsids[i].MetricID) - currMetricIDs.m.Add(tsids[i].MetricID) - } - } } // Flush index to disk, so it becomes visible for search @@ -1567,32 +1530,18 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { t.Fatalf("cannot add filter: %s", err) } - // Perform a search that can be fulfilled out of the hour metrics cache. - // This should return the metrics in the hourly cache + // Perform a search within a day. + // This should return the metrics for the day tr := TimeRange{ - MinTimestamp: int64(now - msecPerHour + 1), + MinTimestamp: int64(now - 2*msecPerHour - 1), MaxTimestamp: int64(now), } matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline) if err != nil { t.Fatalf("error searching tsids: %v", err) } - if len(matchedTSIDs) != 256 { - t.Fatal("Expected time series for current hour, got", len(matchedTSIDs)) - } - - // Perform a search within a day that falls out out of the hour metrics cache. - // This should return the metrics for the day - tr = TimeRange{ - MinTimestamp: int64(now - 2*msecPerHour - 1), - MaxTimestamp: int64(now), - } - matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline) - if err != nil { - t.Fatalf("error searching tsids: %v", err) - } if len(matchedTSIDs) != metricsPerDay { - t.Fatal("Expected time series for current day, got", len(matchedTSIDs)) + t.Fatalf("expected %d time series for current day, got %d time series", metricsPerDay, len(matchedTSIDs)) } // Perform a search across all the days, should match all metrics @@ -1606,7 +1555,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { t.Fatalf("error searching tsids: %v", err) } if len(matchedTSIDs) != metricsPerDay*days { - t.Fatal("Expected time series for all days, got", len(matchedTSIDs)) + t.Fatalf("expected %d time series for all days, got %d time series", metricsPerDay*days, len(matchedTSIDs)) } // Check GetTSDBStatusForDate diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 984ee107a2..2b4bcf3b96 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -5,7 +5,6 @@ import ( "os" "regexp" "strconv" - "sync/atomic" "testing" "time" @@ -50,13 +49,8 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { defer metricNameCache.Stop() defer tsidCache.Stop() - var hmCurr atomic.Value - hmCurr.Store(&hourMetricIDs{}) - var hmPrev atomic.Value - hmPrev.Store(&hourMetricIDs{}) - const dbName = "bench-index-db-add-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -120,13 +114,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { defer metricNameCache.Stop() defer tsidCache.Stop() - var hmCurr atomic.Value - hmCurr.Store(&hourMetricIDs{}) - var hmPrev atomic.Value - hmPrev.Store(&hourMetricIDs{}) - const dbName = "bench-head-posting-for-matchers" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -304,13 +293,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { defer metricNameCache.Stop() defer tsidCache.Stop() - var hmCurr atomic.Value - hmCurr.Store(&hourMetricIDs{}) - var hmPrev atomic.Value - hmPrev.Store(&hourMetricIDs{}) - const dbName = "bench-index-db-get-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 93504e693f..bbb5da9aeb 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -169,7 +169,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil { return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err) } - idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) + idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache) if err != nil { return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err) } @@ -534,7 +534,7 @@ func (s *Storage) mustRotateIndexDB() { // Create new indexdb table. newTableName := nextIndexDBTableName() idbNewPath := s.path + "/indexdb/" + newTableName - idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs) + idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache) if err != nil { logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err) } @@ -1664,8 +1664,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) { s.tsidCache.Set(metricName, buf) } -func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, - currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) { +func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (curr, prev *indexDB, err error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err) } @@ -1724,12 +1723,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w // Open the last two tables. currPath := path + "/" + tableNames[len(tableNames)-1] - curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs) + curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache) if err != nil { return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err) } prevPath := path + "/" + tableNames[len(tableNames)-2] - prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs) + prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache) if err != nil { curr.MustClose() return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)