Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-10-01 19:31:26 +03:00
commit bdaa9a91f3
5 changed files with 67 additions and 223 deletions

View file

@ -377,12 +377,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 {
return float64(idbm().MissingTSIDsForMetricID)
})
metrics.NewGauge(`vm_recent_hour_metric_ids_search_calls_total`, func() float64 {
return float64(idbm().RecentHourMetricIDsSearchCalls)
})
metrics.NewGauge(`vm_recent_hour_metric_ids_search_hits_total`, func() float64 {
return float64(idbm().RecentHourMetricIDsSearchHits)
})
metrics.NewGauge(`vm_date_metric_ids_search_calls_total`, func() float64 {
return float64(idbm().DateMetricIDsSearchCalls)
})

View file

@ -15,7 +15,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
@ -85,12 +84,6 @@ type indexDB struct {
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
// The number of calls to search for metric ids for recent hours.
recentHourMetricIDsSearchCalls uint64
// The number of cache hits during search for metric ids in recent hours.
recentHourMetricIDsSearchHits uint64
// The number of searches for metric ids by days.
dateMetricIDsSearchCalls uint64
@ -149,16 +142,10 @@ type indexDB struct {
// metricIDs, since it usually requires 1 bit per deleted metricID.
deletedMetricIDs atomic.Value
deletedMetricIDsUpdateLock sync.Mutex
// Global lists of metric ids for the current and the previous hours.
// They are used for fast lookups on small time ranges covering
// up to two last hours.
currHourMetricIDs *atomic.Value
prevHourMetricIDs *atomic.Value
}
// openIndexDB opens index db from the given path with the given caches.
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) {
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (*indexDB, error) {
if metricIDCache == nil {
logger.Panicf("BUG: metricIDCache must be non-nil")
}
@ -168,12 +155,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
if tsidCache == nil {
logger.Panicf("BUG: tsidCache must be nin-nil")
}
if currHourMetricIDs == nil {
logger.Panicf("BUG: currHourMetricIDs must be non-nil")
}
if prevHourMetricIDs == nil {
logger.Panicf("BUG: prevHourMetricIDs must be non-nil")
}
tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows)
if err != nil {
@ -196,9 +177,6 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
tsidCache: tsidCache,
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
currHourMetricIDs: currHourMetricIDs,
prevHourMetricIDs: prevHourMetricIDs,
}
is := db.getIndexSearch(noDeadline)
@ -284,8 +262,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
m.RecentHourMetricIDsSearchCalls += atomic.LoadUint64(&db.recentHourMetricIDsSearchCalls)
m.RecentHourMetricIDsSearchHits += atomic.LoadUint64(&db.recentHourMetricIDsSearchHits)
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
@ -454,12 +430,12 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
if versioned {
prefix = atomic.LoadUint64(&tagFiltersKeyGen)
}
const cacheGranularityMs = 1000 * 10
startTime := (uint64(tr.MinTimestamp) / cacheGranularityMs) * cacheGranularityMs
endTime := (uint64(tr.MaxTimestamp) / cacheGranularityMs) * cacheGranularityMs
// Round start and end times to per-day granularity according to per-day inverted index.
startDate := uint64(tr.MinTimestamp) / msecPerDay
endDate := uint64(tr.MaxTimestamp) / msecPerDay
dst = encoding.MarshalUint64(dst, prefix)
dst = encoding.MarshalUint64(dst, startTime)
dst = encoding.MarshalUint64(dst, endTime)
dst = encoding.MarshalUint64(dst, startDate)
dst = encoding.MarshalUint64(dst, endDate)
for _, tfs := range tfss {
dst = append(dst, 0) // separator between tfs groups.
for i := range tfs.tfs {
@ -469,6 +445,14 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
return dst
}
func invalidateTagCache() {
// This function must be fast, since it is called each
// time new timeseries is added.
atomic.AddUint64(&tagFiltersKeyGen, 1)
}
var tagFiltersKeyGen uint64
func marshalTSIDs(dst []byte, tsids []TSID) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(tsids)))
for i := range tsids {
@ -501,14 +485,6 @@ func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) {
return dst, nil
}
func invalidateTagCache() {
// This function must be fast, since it is called each
// time new timeseries is added.
atomic.AddUint64(&tagFiltersKeyGen, 1)
}
var tagFiltersKeyGen uint64
// getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
//
// It returns io.EOF if the given mn isn't found locally.
@ -1431,7 +1407,6 @@ func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
}
func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
minDate := fasttime.UnixDate()
kb := &is.kb
ts := &is.ts
kb.B = append(kb.B[:0], nsPrefixDateTagToMetricIDs)
@ -1441,7 +1416,8 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
// The databse doesn't contain per-day inverted index yet.
return minDate, nil
// Return the minimum possible date, i.e. 0.
return 0, nil
}
suffix := item[len(prefix):]
@ -1449,14 +1425,15 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) {
if len(suffix) < 8 {
return 0, fmt.Errorf("unexpected (date, tag)->metricIDs row len; must be at least 8 bytes; got %d bytes", len(suffix))
}
minDate = encoding.UnmarshalUint64(suffix)
minDate := encoding.UnmarshalUint64(suffix)
return minDate, nil
}
if err := ts.Error(); err != nil {
return 0, err
}
// There are no (date,tag)->metricIDs entries in the database yet.
return minDate, nil
// Return the minimum possible date, i.e. 0.
return 0, nil
}
func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
@ -1783,9 +1760,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
}
func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
// Try fast path with the minimized number of maxMetrics.
maxMetricsAdjusted := is.adjustMaxMetricsAdaptive(tr, maxMetrics)
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetricsAdjusted)
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
if err == nil {
return minTf, minMetricIDs, nil
}
@ -1835,29 +1810,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilter
maxMetrics, tr.String())
}
const maxDaysForDateMetricIDs = 40
func (is *indexSearch) adjustMaxMetricsAdaptive(tr TimeRange, maxMetrics int) int {
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
if maxDate-minDate > maxDaysForDateMetricIDs {
// Cannot reduce maxMetrics for the given time range,
// since it is expensive extracting metricIDs for the given tr.
return maxMetrics
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if !hmPrev.isFull {
return maxMetrics
}
hourMetrics := hmPrev.m.Len()
if maxMetrics > hourMetrics {
// It is cheaper to filter on the hour or day metrics if the minimum
// number of matching metrics across tfs exceeds hourMetrics.
return hourMetrics
}
return maxMetrics
}
func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
kb := &is.kb
kb.B = append(kb.B[:0], uselessMultiTagFiltersKeyPrefix)
@ -2011,6 +1963,11 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
}
continue
}
if bytes.Equal(tf.key, graphiteReverseTagKey) {
// Skip artificial tag filter for Graphite-like metric names with dots,
// since mn doesn't contain the corresponding tag.
continue
}
// Search for matching tag name.
tagMatched := false
@ -2483,16 +2440,6 @@ var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMet
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
atomic.AddUint64(&is.db.recentHourMetricIDsSearchCalls, 1)
metricIDs, ok := is.getMetricIDsForRecentHours(tr, maxMetrics)
if ok {
// Fast path: tr covers the current and / or the previous hour.
// Return the full list of metric ids for this time range.
atomic.AddUint64(&is.db.recentHourMetricIDsSearchHits, 1)
return metricIDs, nil
}
// Slow path: collect the metric ids for all the days covering the given tr.
atomic.AddUint64(&is.db.dateMetricIDsSearchCalls, 1)
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
@ -2511,7 +2458,7 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
}
// Slower path - query over multiple days in parallel.
metricIDs = &uint64set.Set{}
metricIDs := &uint64set.Set{}
var wg sync.WaitGroup
var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below.
@ -2545,6 +2492,8 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*
return metricIDs, nil
}
const maxDaysForDateMetricIDs = 40
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
minDate := uint64(tr.MinTimestamp) / msecPerDay
@ -2688,6 +2637,10 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
metricIDs = m
}
if metricIDs.Len() == 0 {
// There is no sense in inspecting tfsRemainingWithCount, since the result will be empty.
return nil, nil
}
// Intersect metricIDs with the rest of filters.
for i := range tfsRemainingWithCount {
@ -2737,41 +2690,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
return metricIDs, nil
}
func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (*uint64set.Set, bool) {
minHour := uint64(tr.MinTimestamp) / msecPerHour
maxHour := uint64(tr.MaxTimestamp) / msecPerHour
hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull {
// The tr fits the current hour.
// Return a copy of hmCurr.m, because the caller may modify
// the returned map.
if hmCurr.m.Len() > maxMetrics {
return nil, false
}
return hmCurr.m.Clone(), true
}
hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs)
if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull {
// The tr fits the previous hour.
// Return a copy of hmPrev.m, because the caller may modify
// the returned map.
if hmPrev.m.Len() > maxMetrics {
return nil, false
}
return hmPrev.m.Clone(), true
}
if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull {
// The tr spans the previous and the current hours.
if hmCurr.m.Len()+hmPrev.m.Len() > maxMetrics {
return nil, false
}
metricIDs := hmCurr.m.Clone()
metricIDs.Union(hmPrev.m)
return metricIDs, true
}
return nil, false
}
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
items := getIndexItems()
defer putIndexItems(items)

View file

@ -8,13 +8,11 @@ import (
"os"
"reflect"
"regexp"
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)
@ -455,13 +453,8 @@ func TestIndexDBOpenClose(t *testing.T) {
defer metricNameCache.Stop()
defer tsidCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
for i := 0; i < 5; i++ {
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -483,13 +476,8 @@ func TestIndexDB(t *testing.T) {
defer metricNameCache.Stop()
defer tsidCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
dbName := "test-index-db-serial"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -519,7 +507,7 @@ func TestIndexDB(t *testing.T) {
// Re-open the db and verify it works as expected.
db.MustClose()
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -542,13 +530,8 @@ func TestIndexDB(t *testing.T) {
defer metricNameCache.Stop()
defer tsidCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
dbName := "test-index-db-concurrent"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -821,6 +804,11 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Try tag filters.
currentTime := timestampFromTime(time.Now())
tr := TimeRange{
MinTimestamp: currentTime - msecPerDay,
MaxTimestamp: currentTime + msecPerDay,
}
for i := range mns {
mn := &mns[i]
tsid := &tsids[i]
@ -842,7 +830,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, false); err != nil {
return fmt.Errorf("cannot add no-op negative filter: %w", err)
}
tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err)
}
@ -851,7 +839,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Verify tag cache.
tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsCached, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err)
}
@ -863,7 +851,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
}
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
}
@ -884,7 +872,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if tfsNew := tfs.Finalize(); len(tfsNew) > 0 {
return fmt.Errorf("unexpected non-empty tag filters returned by TagFilters.Finalize: %v", tfsNew)
}
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
}
@ -912,7 +900,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, true); err != nil {
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
}
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter: %w", err)
}
@ -922,7 +910,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
}
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
}
@ -938,7 +926,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
}
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
}
@ -954,7 +942,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
// Search with empty filter. It should match all the results.
tfs.Reset()
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search for common prefix: %w", err)
}
@ -967,7 +955,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
}
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
}
@ -984,7 +972,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
}
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs([]*TagFilters{tfs1, tfs2}, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
}
@ -993,7 +981,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Verify empty tfss
tsidsFound, err = db.searchTSIDs(nil, TimeRange{}, 1e5, noDeadline)
tsidsFound, err = db.searchTSIDs(nil, tr, 1e5, noDeadline)
if err != nil {
return fmt.Errorf("cannot search for nil tfss: %w", err)
}
@ -1474,23 +1462,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
defer metricNameCache.Stop()
defer tsidCache.Stop()
currMetricIDs := &hourMetricIDs{
isFull: true,
m: &uint64set.Set{},
}
var hmCurr atomic.Value
hmCurr.Store(currMetricIDs)
prevMetricIDs := &hourMetricIDs{
isFull: true,
m: &uint64set.Set{},
}
var hmPrev atomic.Value
hmPrev.Store(prevMetricIDs)
dbName := "test-index-db-ts-range"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -1509,8 +1482,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
const metricsPerDay = 1000
theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC)
now := uint64(timestampFromTime(theDay))
currMetricIDs.hour = now / msecPerHour
prevMetricIDs.hour = (now - msecPerHour) / msecPerHour
baseDate := now / msecPerDay
var metricNameBuf []byte
for day := 0; day < days; day++ {
@ -1541,21 +1512,13 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
// Add the metrics to the per-day stores
date := baseDate - uint64(day*msecPerDay)
date := baseDate - uint64(day)
for i := range tsids {
tsid := &tsids[i]
if err := is.storeDateMetricID(date, tsid.MetricID); err != nil {
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
}
}
// Add the the hour metrics caches
if day == 0 {
for i := 0; i < 256; i++ {
prevMetricIDs.m.Add(tsids[i].MetricID)
currMetricIDs.m.Add(tsids[i].MetricID)
}
}
}
// Flush index to disk, so it becomes visible for search
@ -1567,32 +1530,18 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
t.Fatalf("cannot add filter: %s", err)
}
// Perform a search that can be fulfilled out of the hour metrics cache.
// This should return the metrics in the hourly cache
// Perform a search within a day.
// This should return the metrics for the day
tr := TimeRange{
MinTimestamp: int64(now - msecPerHour + 1),
MinTimestamp: int64(now - 2*msecPerHour - 1),
MaxTimestamp: int64(now),
}
matchedTSIDs, err := db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}
if len(matchedTSIDs) != 256 {
t.Fatal("Expected time series for current hour, got", len(matchedTSIDs))
}
// Perform a search within a day that falls out out of the hour metrics cache.
// This should return the metrics for the day
tr = TimeRange{
MinTimestamp: int64(now - 2*msecPerHour - 1),
MaxTimestamp: int64(now),
}
matchedTSIDs, err = db.searchTSIDs([]*TagFilters{tfs}, tr, 10000, noDeadline)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}
if len(matchedTSIDs) != metricsPerDay {
t.Fatal("Expected time series for current day, got", len(matchedTSIDs))
t.Fatalf("expected %d time series for current day, got %d time series", metricsPerDay, len(matchedTSIDs))
}
// Perform a search across all the days, should match all metrics
@ -1606,7 +1555,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
t.Fatalf("error searching tsids: %v", err)
}
if len(matchedTSIDs) != metricsPerDay*days {
t.Fatal("Expected time series for all days, got", len(matchedTSIDs))
t.Fatalf("expected %d time series for all days, got %d time series", metricsPerDay*days, len(matchedTSIDs))
}
// Check GetTSDBStatusForDate

View file

@ -5,7 +5,6 @@ import (
"os"
"regexp"
"strconv"
"sync/atomic"
"testing"
"time"
@ -50,13 +49,8 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
defer metricNameCache.Stop()
defer tsidCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
const dbName = "bench-index-db-add-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -120,13 +114,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
defer metricNameCache.Stop()
defer tsidCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
const dbName = "bench-head-posting-for-matchers"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -304,13 +293,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
defer metricNameCache.Stop()
defer tsidCache.Stop()
var hmCurr atomic.Value
hmCurr.Store(&hourMetricIDs{})
var hmPrev atomic.Value
hmPrev.Store(&hourMetricIDs{})
const dbName = "bench-index-db-get-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, &hmCurr, &hmPrev)
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}

View file

@ -169,7 +169,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
}
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache)
if err != nil {
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
}
@ -534,7 +534,7 @@ func (s *Storage) mustRotateIndexDB() {
// Create new indexdb table.
newTableName := nextIndexDBTableName()
idbNewPath := s.path + "/indexdb/" + newTableName
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, &s.currHourMetricIDs, &s.prevHourMetricIDs)
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache)
if err != nil {
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
}
@ -1664,8 +1664,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
s.tsidCache.Set(metricName, buf)
}
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache,
currHourMetricIDs, prevHourMetricIDs *atomic.Value) (curr, prev *indexDB, err error) {
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache) (curr, prev *indexDB, err error) {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
}
@ -1724,12 +1723,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w
// Open the last two tables.
currPath := path + "/" + tableNames[len(tableNames)-1]
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs)
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache)
if err != nil {
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
}
prevPath := path + "/" + tableNames[len(tableNames)-2]
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, currHourMetricIDs, prevHourMetricIDs)
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache)
if err != nil {
curr.MustClose()
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)