diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 5291bdb91..dcea69b36 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -1214,83 +1214,80 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error { } func TestStorageRotateIndexDB(t *testing.T) { - path := "TestStorageRotateIndexDB" - s := MustOpenStorage(path, 0, 0, 0) + defer testRemoveAll(t) - // Start indexDB rotater in a separate goroutine - stopCh := make(chan struct{}) - rotateDoneCh := make(chan struct{}) - go func() { - for { - select { - case <-stopCh: - close(rotateDoneCh) - return - default: - time.Sleep(time.Millisecond) - s.mustRotateIndexDB(time.Now()) - } - } - }() - - // Run concurrent workers that insert / select data from the storage. - ch := make(chan error, 3) - for i := 0; i < cap(ch); i++ { - go func(workerNum int) { - ch <- testStorageAddMetrics(s, workerNum) - }(i) + const ( + numRotations = 4 + numWorkers = 10 + numRows = 10000 + ) + tr := TimeRange{ + MinTimestamp: time.Now().UTC().Add(-numRows * time.Hour).UnixMilli(), + MaxTimestamp: time.Now().UTC().UnixMilli(), } - for i := 0; i < cap(ch); i++ { - select { - case err := <-ch: - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - case <-time.After(10 * time.Second): - t.Fatalf("timeout") + s := MustOpenStorage(t.Name(), 0, 0, 0) + defer s.MustClose() + + insertAndRotateConcurrently := func(i int) (int, int) { + var wg sync.WaitGroup + for workerNum := range numWorkers { + wg.Add(1) + go func() { + time.Sleep(1 * time.Millisecond) + rng := rand.New(rand.NewSource(1)) + prefix := fmt.Sprintf("metric_%d_%d", i, workerNum) + mrs := testGenerateMetricRowsWithPrefix(rng, numRows, prefix, tr) + s.AddRows(mrs, defaultPrecisionBits) + wg.Done() + }() } + s.mustRotateIndexDB(time.Now()) + wg.Wait() + s.DebugFlush() + + idbCurr := s.idb() + idbPrev := idbCurr.extDB + isCurr := idbCurr.getIndexSearch(noDeadline) + defer idbCurr.putIndexSearch(isCurr) + isPrev := idbPrev.getIndexSearch(noDeadline) + defer idbPrev.putIndexSearch(isPrev) + + return testCountAllMetricNamesNoExtDB(isPrev, tr), testCountAllMetricNamesNoExtDB(isCurr, tr) } - close(stopCh) - <-rotateDoneCh + var oldCurr int + for i := range numRotations { + newPrev, newCurr := insertAndRotateConcurrently(i) - s.MustClose() - if err := os.RemoveAll(path); err != nil { - t.Fatalf("cannot remove %q: %s", path, err) + var m Metrics + s.UpdateMetrics(&m) + if got, want := m.TableMetrics.TotalRowsCount(), uint64(numWorkers*numRows*(i+1)); got != want { + t.Errorf("[rotation %d] unexpected row count: got %d, want %d", i, got, want) + } + + if got, want := newPrev-oldCurr+newCurr, numWorkers*numRows; got != want { + t.Errorf("[rotation %d] unexpected metric count count: got (%d - %d) + %d = %d, want %d", i, newPrev, oldCurr, newCurr, got, want) + } + oldCurr = newCurr } } -func testStorageAddMetrics(s *Storage, workerNum int) error { - rng := rand.New(rand.NewSource(1)) - const rowsCount = 1e3 - - var mn MetricName - mn.Tags = []Tag{ - {[]byte("job"), []byte(fmt.Sprintf("webservice_%d", workerNum))}, - {[]byte("instance"), []byte("1.2.3.4")}, +func testCountAllMetricNamesNoExtDB(is *indexSearch, tr TimeRange) int { + tfss := NewTagFilters() + if err := tfss.Add([]byte("__name__"), []byte(".*"), false, true); err != nil { + panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err)) } - for i := 0; i < rowsCount; i++ { - mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", workerNum, rng.Intn(10))) - metricNameRaw := mn.marshalRaw(nil) - timestamp := rng.Int63n(1e10) - value := rng.NormFloat64() * 1e6 - - mr := MetricRow{ - MetricNameRaw: metricNameRaw, - Timestamp: timestamp, - Value: value, - } - s.AddRows([]MetricRow{mr}, defaultPrecisionBits) + metricIDs, err := is.searchMetricIDs(nil, []*TagFilters{tfss}, tr, 1e9) + if err != nil { + panic(fmt.Sprintf("searchMetricIDs failed unexpectedly: %v", err)) } - - // Verify the storage contains rows. - minRowsExpected := uint64(rowsCount) - var m Metrics - s.UpdateMetrics(&m) - if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected { - return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount) + metricNames := map[string]bool{} + var metricName []byte + for _, metricID := range metricIDs { + metricName, _ = is.searchMetricName(metricName[:0], metricID) + metricNames[string(metricName)] = true } - return nil + return len(metricNames) } func TestStorageDeleteStaleSnapshots(t *testing.T) {