mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: Fix flaky test: TestStorageRotateIndexDB (#7267)
This commit fixes the TestStorageRotateIndexDB flaky test reported at: #6977. Sample test failure: https://pastebin.com/bTSs8HP1 The test fails because one goroutine adds items to the indexDB table while another goroutine is closing that table. This may happen if indexDB rotation happens twice during one Storage.add() operation: - Storage.add() takes the current indexDB and adds index recods to it - First index db rotation makes the current index DB a previous one (still ok at this point) - Second index db rotation removes the indexDB that was current two rotations earlier. It does this by setting the mustDrop flag to true and decrementing the ref counter. The ref counter reaches zero which cases the underlying indexdb table to release its resources gracefully. Graceful release assumes that the table is not written anymore. But Storage.add() still adds items to it. The solution is to increment the indexDB ref counters while it is used inside add(). The unit test has been changed a little so that the test fails reliably. The idea is to make add() function invocation to last much longer, therefore the test inserts not just one record at a time but thouthands of them. To see the test fail, just replace the idbsLocked() func with: ```go unc (s *Storage) idbsLocked2() (*indexDB, *indexDB, func()) { return s.idbCurr.Load(), s.idbNext.Load(), func() {} } ``` --------- Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
parent
7e53324f5d
commit
6b9f57e5f7
1 changed files with 62 additions and 65 deletions
|
@ -1214,83 +1214,80 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
|||
}
|
||||
|
||||
func TestStorageRotateIndexDB(t *testing.T) {
|
||||
path := "TestStorageRotateIndexDB"
|
||||
s := MustOpenStorage(path, 0, 0, 0)
|
||||
defer testRemoveAll(t)
|
||||
|
||||
// Start indexDB rotater in a separate goroutine
|
||||
stopCh := make(chan struct{})
|
||||
rotateDoneCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
close(rotateDoneCh)
|
||||
return
|
||||
default:
|
||||
time.Sleep(time.Millisecond)
|
||||
s.mustRotateIndexDB(time.Now())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Run concurrent workers that insert / select data from the storage.
|
||||
ch := make(chan error, 3)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func(workerNum int) {
|
||||
ch <- testStorageAddMetrics(s, workerNum)
|
||||
}(i)
|
||||
const (
|
||||
numRotations = 4
|
||||
numWorkers = 10
|
||||
numRows = 10000
|
||||
)
|
||||
tr := TimeRange{
|
||||
MinTimestamp: time.Now().UTC().Add(-numRows * time.Hour).UnixMilli(),
|
||||
MaxTimestamp: time.Now().UTC().UnixMilli(),
|
||||
}
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
s := MustOpenStorage(t.Name(), 0, 0, 0)
|
||||
defer s.MustClose()
|
||||
|
||||
insertAndRotateConcurrently := func(i int) (int, int) {
|
||||
var wg sync.WaitGroup
|
||||
for workerNum := range numWorkers {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
prefix := fmt.Sprintf("metric_%d_%d", i, workerNum)
|
||||
mrs := testGenerateMetricRowsWithPrefix(rng, numRows, prefix, tr)
|
||||
s.AddRows(mrs, defaultPrecisionBits)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
s.mustRotateIndexDB(time.Now())
|
||||
wg.Wait()
|
||||
s.DebugFlush()
|
||||
|
||||
idbCurr := s.idb()
|
||||
idbPrev := idbCurr.extDB
|
||||
isCurr := idbCurr.getIndexSearch(noDeadline)
|
||||
defer idbCurr.putIndexSearch(isCurr)
|
||||
isPrev := idbPrev.getIndexSearch(noDeadline)
|
||||
defer idbPrev.putIndexSearch(isPrev)
|
||||
|
||||
return testCountAllMetricNamesNoExtDB(isPrev, tr), testCountAllMetricNamesNoExtDB(isCurr, tr)
|
||||
}
|
||||
|
||||
close(stopCh)
|
||||
<-rotateDoneCh
|
||||
var oldCurr int
|
||||
for i := range numRotations {
|
||||
newPrev, newCurr := insertAndRotateConcurrently(i)
|
||||
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
var m Metrics
|
||||
s.UpdateMetrics(&m)
|
||||
if got, want := m.TableMetrics.TotalRowsCount(), uint64(numWorkers*numRows*(i+1)); got != want {
|
||||
t.Errorf("[rotation %d] unexpected row count: got %d, want %d", i, got, want)
|
||||
}
|
||||
|
||||
if got, want := newPrev-oldCurr+newCurr, numWorkers*numRows; got != want {
|
||||
t.Errorf("[rotation %d] unexpected metric count count: got (%d - %d) + %d = %d, want %d", i, newPrev, oldCurr, newCurr, got, want)
|
||||
}
|
||||
oldCurr = newCurr
|
||||
}
|
||||
}
|
||||
|
||||
func testStorageAddMetrics(s *Storage, workerNum int) error {
|
||||
rng := rand.New(rand.NewSource(1))
|
||||
const rowsCount = 1e3
|
||||
|
||||
var mn MetricName
|
||||
mn.Tags = []Tag{
|
||||
{[]byte("job"), []byte(fmt.Sprintf("webservice_%d", workerNum))},
|
||||
{[]byte("instance"), []byte("1.2.3.4")},
|
||||
func testCountAllMetricNamesNoExtDB(is *indexSearch, tr TimeRange) int {
|
||||
tfss := NewTagFilters()
|
||||
if err := tfss.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
|
||||
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
|
||||
}
|
||||
for i := 0; i < rowsCount; i++ {
|
||||
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", workerNum, rng.Intn(10)))
|
||||
metricNameRaw := mn.marshalRaw(nil)
|
||||
timestamp := rng.Int63n(1e10)
|
||||
value := rng.NormFloat64() * 1e6
|
||||
|
||||
mr := MetricRow{
|
||||
MetricNameRaw: metricNameRaw,
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
}
|
||||
s.AddRows([]MetricRow{mr}, defaultPrecisionBits)
|
||||
metricIDs, err := is.searchMetricIDs(nil, []*TagFilters{tfss}, tr, 1e9)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("searchMetricIDs failed unexpectedly: %v", err))
|
||||
}
|
||||
|
||||
// Verify the storage contains rows.
|
||||
minRowsExpected := uint64(rowsCount)
|
||||
var m Metrics
|
||||
s.UpdateMetrics(&m)
|
||||
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
|
||||
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
|
||||
metricNames := map[string]bool{}
|
||||
var metricName []byte
|
||||
for _, metricID := range metricIDs {
|
||||
metricName, _ = is.searchMetricName(metricName[:0], metricID)
|
||||
metricNames[string(metricName)] = true
|
||||
}
|
||||
return nil
|
||||
return len(metricNames)
|
||||
}
|
||||
|
||||
func TestStorageDeleteStaleSnapshots(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue