From 683bf2a11f998c43e27c3dbc08f0ecdd0abc38ae Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 25 Jun 2019 20:09:57 +0300 Subject: [PATCH] lib/storage: make sure non-nil args are passed to openIndexDB --- lib/storage/index_db.go | 24 ++++++++++++++---------- lib/storage/index_db_test.go | 27 +++++++++++++++++++++++---- lib/storage/index_db_timing_test.go | 25 ++++++++++++++++++++++--- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 415ca76fd..a0fb9d521 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -102,6 +102,19 @@ type indexDB struct { // openIndexDB opens index db from the given path with the given caches. func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, currHourMetricIDs, prevHourMetricIDs *atomic.Value) (*indexDB, error) { + if metricIDCache == nil { + logger.Panicf("BUG: metricIDCache must be non-nil") + } + if metricNameCache == nil { + logger.Panicf("BUG: metricNameCache must be non-nil") + } + if currHourMetricIDs == nil { + logger.Panicf("BUG: currHourMetricIDs must be non-nil") + } + if prevHourMetricIDs == nil { + logger.Panicf("BUG: prevHourMetricIDs must be non-nil") + } + tb, err := mergeset.OpenTable(path) if err != nil { return nil, fmt.Errorf("cannot open indexDB %q: %s", path, err) @@ -1251,11 +1264,8 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters var errTooManyMetrics = errors.New("all the tag filters match too many metrics") func (is *indexSearch) adjustMaxMetricsAdaptive(maxMetrics int) int { - if is.db.prevHourMetricIDs == nil { - return maxMetrics - } hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) - if hmPrev == nil || !hmPrev.isFull { + if !hmPrev.isFull { return maxMetrics } hourMetrics := len(hmPrev.m) @@ -1678,9 +1688,6 @@ func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (m func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) (map[uint64]struct{}, bool) { minHour := uint64(tr.MinTimestamp) / msecPerHour maxHour := uint64(tr.MaxTimestamp) / msecPerHour - if is.db.currHourMetricIDs == nil { - return nil, false - } hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull { // The tr fits the current hour. @@ -1691,9 +1698,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) } return getMetricIDsCopy(hmCurr.m), true } - if is.db.prevHourMetricIDs == nil { - return nil, false - } hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { // The tr fits the previous hour. diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 6e35eacbc..fcb3dcf22 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -7,6 +7,7 @@ import ( "math/rand" "os" "reflect" + "sync/atomic" "testing" "time" @@ -60,8 +61,14 @@ func TestIndexDBOpenClose(t *testing.T) { metricNameCache := fastcache.New(1234) defer metricIDCache.Reset() defer metricNameCache.Reset() + + var hmCurr atomic.Value + hmCurr.Store(&hourMetricIDs{}) + var hmPrev atomic.Value + hmPrev.Store(&hourMetricIDs{}) + for i := 0; i < 5; i++ { - db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, nil, nil) + db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -82,8 +89,14 @@ func TestIndexDB(t *testing.T) { metricNameCache := fastcache.New(1234) defer metricIDCache.Reset() defer metricNameCache.Reset() + + var hmCurr atomic.Value + hmCurr.Store(&hourMetricIDs{}) + var hmPrev atomic.Value + hmPrev.Store(&hourMetricIDs{}) + dbName := "test-index-db-serial" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -113,7 +126,7 @@ func TestIndexDB(t *testing.T) { // Re-open the db and verify it works as expected. db.MustClose() - db, err = openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) + db, err = openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -133,8 +146,14 @@ func TestIndexDB(t *testing.T) { metricNameCache := fastcache.New(1234) defer metricIDCache.Reset() defer metricNameCache.Reset() + + var hmCurr atomic.Value + hmCurr.Store(&hourMetricIDs{}) + var hmPrev atomic.Value + hmPrev.Store(&hourMetricIDs{}) + dbName := "test-index-db-concurrent" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 4f03d925d..36f9e518d 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strconv" + "sync/atomic" "testing" "github.com/VictoriaMetrics/fastcache" @@ -16,8 +17,14 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { metricNameCache := fastcache.New(1234) defer metricIDCache.Reset() defer metricNameCache.Reset() + + var hmCurr atomic.Value + hmCurr.Store(&hourMetricIDs{}) + var hmPrev atomic.Value + hmPrev.Store(&hourMetricIDs{}) + const dbName = "bench-index-db-add-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -76,8 +83,14 @@ func BenchmarkIndexDBSearchTSIDs(b *testing.B) { metricNameCache := fastcache.New(1234) defer metricIDCache.Reset() defer metricNameCache.Reset() + + var hmCurr atomic.Value + hmCurr.Store(&hourMetricIDs{}) + var hmPrev atomic.Value + hmPrev.Store(&hourMetricIDs{}) + const dbName = "bench-index-db-search-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -145,8 +158,14 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { metricNameCache := fastcache.New(1234) defer metricIDCache.Reset() defer metricNameCache.Reset() + + var hmCurr atomic.Value + hmCurr.Store(&hourMetricIDs{}) + var hmPrev atomic.Value + hmPrev.Store(&hourMetricIDs{}) + const dbName = "bench-index-db-get-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, nil, nil) + db, err := openIndexDB(dbName, metricIDCache, metricNameCache, &hmCurr, &hmPrev) if err != nil { b.Fatalf("cannot open indexDB: %s", err) }