package storage import ( "fmt" "math/rand" "os" "reflect" "sort" "strings" "testing" "testing/quick" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) func TestReplaceAlternateRegexpsWithGraphiteWildcards(t *testing.T) { f := func(q, resultExpected string) { t.Helper() result := replaceAlternateRegexpsWithGraphiteWildcards([]byte(q)) if string(result) != resultExpected { t.Fatalf("unexpected result for %s\ngot\n%s\nwant\n%s", q, result, resultExpected) } } f("", "") f("foo", "foo") f("foo(bar", "foo(bar") f("foo.(bar|baz", "foo.(bar|baz") f("foo.(bar).x", "foo.{bar}.x") f("foo.(bar|baz).*.{x,y}", "foo.{bar,baz}.*.{x,y}") f("foo.(bar|baz).*.{x,y}(z|aa)", "foo.{bar,baz}.*.{x,y}{z,aa}") f("foo(.*)", "foo*") } func TestGetRegexpForGraphiteNodeQuery(t *testing.T) { f := func(q, expectedRegexp string) { t.Helper() re, err := getRegexpForGraphiteQuery(q) if err != nil { t.Fatalf("unexpected error for query=%q: %s", q, err) } reStr := re.String() if reStr != expectedRegexp { t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp) } } f(``, `^$`) f(`*`, `^[^.]*$`) f(`foo.`, `^foo\.$`) f(`foo.bar`, `^foo\.bar$`) f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`) f(`[-a-zx.]`, `^[-a-zx.]$`) f(`**`, `^[^.]*[^.]*$`) f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`) f(`foo{bar`, `^foo\{bar$`) f(`foo{ba,r`, `^foo\{ba,r$`) f(`foo[bar`, `^foo\[bar$`) f(`foo{bar}`, `^foobar$`) f(`foo{bar,,b{{a,b*},z},[x-y]*z}a`, `^foo(?:bar||b(?:(?:a|b[^.]*)|z)|[x-y][^.]*z)a$`) } func TestDateMetricIDCacheSerial(t *testing.T) { c := newDateMetricIDCache() if err := testDateMetricIDCache(c, false); err != nil { t.Fatalf("unexpected error: %s", err) } } func TestDateMetricIDCacheConcurrent(t *testing.T) { c := newDateMetricIDCache() ch := make(chan error, 5) for i := 0; i < 5; i++ { go func() { ch <- testDateMetricIDCache(c, true) }() } for i := 0; i < 5; i++ { select { case err := <-ch: if err != nil { t.Fatalf("unexpected error: %s", err) } case <-time.After(time.Second * 5): t.Fatalf("timeout") } } } func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { type dmk struct { date uint64 metricID uint64 } m := make(map[dmk]bool) for i := 0; i < 1e5; i++ { date := uint64(i) % 3 metricID := uint64(i) % 1237 if !concurrent && c.Has(date, metricID) { if !m[dmk{date, metricID}] { return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID) } continue } c.Set(date, metricID) m[dmk{date, metricID}] = true if !concurrent && !c.Has(date, metricID) { return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID) } if i%11234 == 0 { c.mu.Lock() c.syncLocked() c.mu.Unlock() } if i%34323 == 0 { c.Reset() m = make(map[dmk]bool) } } // Verify fast path after sync. for i := 0; i < 1e5; i++ { date := uint64(i) % 3 metricID := uint64(i) % 123 c.Set(date, metricID) } c.mu.Lock() c.syncLocked() c.mu.Unlock() for i := 0; i < 1e5; i++ { date := uint64(i) % 3 metricID := uint64(i) % 123 if !concurrent && !c.Has(date, metricID) { return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID) } } // Verify c.Reset if n := c.EntriesCount(); !concurrent && n < 123 { return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n) } c.Reset() if n := c.EntriesCount(); !concurrent && n > 0 { return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n) } return nil } func TestUpdateCurrHourMetricIDs(t *testing.T) { newStorage := func() *Storage { var s Storage s.currHourMetricIDs.Store(&hourMetricIDs{}) s.prevHourMetricIDs.Store(&hourMetricIDs{}) s.pendingHourEntries = &uint64set.Set{} return &s } t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: 123, } hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } } if hmCurr.m.Len() != 0 { t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0) } if !hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } if s.pendingHourEntries.Len() != 0 { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, } hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } // Do not run other checks, since they may fail. return } if !reflect.DeepEqual(hmCurr, hmOrig) { t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig) } if hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } if s.pendingHourEntries.Len() != 0 { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) { s := newStorage() pendingHourEntries := &uint64set.Set{} pendingHourEntries.Add(343) pendingHourEntries.Add(32424) pendingHourEntries.Add(8293432) s.pendingHourEntries = pendingHourEntries hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: 123, } hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } } if !hmCurr.m.Equal(pendingHourEntries) { t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries) } if !hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } if s.pendingHourEntries.Len() != 0 { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) { s := newStorage() pendingHourEntries := &uint64set.Set{} pendingHourEntries.Add(343) pendingHourEntries.Add(32424) pendingHourEntries.Add(8293432) s.pendingHourEntries = pendingHourEntries hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, hour: hour, } hmOrig.m.Add(12) hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) if hmCurr.hour != hour { // It is possible new hour occurred. Update the hour and verify it again. hour = uint64(timestampFromTime(time.Now())) / msecPerHour if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } // Do not run other checks, since they may fail. return } m := pendingHourEntries.Clone() hmOrig.m.ForEach(func(part []uint64) bool { for _, metricID := range part { m.Add(metricID) } return true }) if !hmCurr.m.Equal(m) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) } if hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, false) } hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) } if s.pendingHourEntries.Len() != 0 { t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0) } }) } func TestMetricRowMarshalUnmarshal(t *testing.T) { var buf []byte typ := reflect.TypeOf(&MetricRow{}) rnd := rand.New(rand.NewSource(1)) for i := 0; i < 1000; i++ { v, ok := quick.Value(typ, rnd) if !ok { t.Fatalf("cannot create random MetricRow via quick.Value") } mr1 := v.Interface().(*MetricRow) if mr1 == nil { continue } buf = mr1.Marshal(buf[:0]) var mr2 MetricRow tail, err := mr2.UnmarshalX(buf) if err != nil { t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err) } if len(tail) > 0 { t.Fatalf("non-empty tail returned after MetricRow.Unmarshal for mr1=%s", mr1) } if mr1.MetricNameRaw == nil { mr1.MetricNameRaw = []byte{} } if mr2.MetricNameRaw == nil { mr2.MetricNameRaw = []byte{} } if !reflect.DeepEqual(mr1, &mr2) { t.Fatalf("mr1 should match mr2; got\nmr1=%s\nmr2=%s", mr1, &mr2) } } } func TestNextRetentionDuration(t *testing.T) { for retentionMonths := float64(0.1); retentionMonths < 120; retentionMonths += 0.3 { d := nextRetentionDuration(int64(retentionMonths * msecsPerMonth)) if d <= 0 { currTime := time.Now().UTC() nextTime := time.Now().UTC().Add(d) t.Fatalf("unexpected retention duration for retentionMonths=%f; got %s; must be %s + %f months", retentionMonths, nextTime, currTime, retentionMonths) } } } func TestStorageOpenClose(t *testing.T) { path := "TestStorageOpenClose" for i := 0; i < 10; i++ { s, err := OpenStorage(path, -1, 1e5, 1e6) if err != nil { t.Fatalf("cannot open storage: %s", err) } s.MustClose() } if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func TestStorageOpenMultipleTimes(t *testing.T) { path := "TestStorageOpenMultipleTimes" s1, err := OpenStorage(path, -1, 0, 0) if err != nil { t.Fatalf("cannot open storage the first time: %s", err) } for i := 0; i < 10; i++ { s2, err := OpenStorage(path, -1, 0, 0) if err == nil { s2.MustClose() t.Fatalf("expecting non-nil error when opening already opened storage") } } s1.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func TestStorageRandTimestamps(t *testing.T) { path := "TestStorageRandTimestamps" retentionMsecs := int64(60 * msecsPerMonth) s, err := OpenStorage(path, retentionMsecs, 0, 0) if err != nil { t.Fatalf("cannot open storage: %s", err) } t.Run("serial", func(t *testing.T) { for i := 0; i < 3; i++ { if err := testStorageRandTimestamps(s); err != nil { t.Fatal(err) } s.MustClose() s, err = OpenStorage(path, retentionMsecs, 0, 0) } }) t.Run("concurrent", func(t *testing.T) { ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func() { var err error for i := 0; i < 2; i++ { err = testStorageRandTimestamps(s) } ch <- err }() } for i := 0; i < cap(ch); i++ { select { case err := <-ch: if err != nil { t.Fatal(err) } case <-time.After(time.Second * 10): t.Fatal("timeout") } } }) s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func testStorageRandTimestamps(s *Storage) error { const rowsPerAdd = 1e3 const addsCount = 2 typ := reflect.TypeOf(int64(0)) rnd := rand.New(rand.NewSource(1)) for i := 0; i < addsCount; i++ { var mrs []MetricRow var mn MetricName mn.Tags = []Tag{ {[]byte("job"), []byte("webservice")}, {[]byte("instance"), []byte("1.2.3.4")}, } for j := 0; j < rowsPerAdd; j++ { mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rand.Intn(100))) metricNameRaw := mn.marshalRaw(nil) timestamp := int64(rnd.NormFloat64() * 1e12) if j%2 == 0 { ts, ok := quick.Value(typ, rnd) if !ok { return fmt.Errorf("cannot create random timestamp via quick.Value") } timestamp = ts.Interface().(int64) } value := rnd.NormFloat64() * 1e12 mr := MetricRow{ MetricNameRaw: metricNameRaw, Timestamp: timestamp, Value: value, } mrs = append(mrs, mr) } if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { errStr := err.Error() if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") { return fmt.Errorf("unexpected error when adding mrs: %w", err) } } } // Verify the storage contains rows. var m Metrics s.UpdateMetrics(&m) if m.TableMetrics.SmallRowsCount == 0 { return fmt.Errorf("expecting at least one row in the table") } return nil } func TestStorageDeleteSeries(t *testing.T) { path := "TestStorageDeleteSeries" s, err := OpenStorage(path, 0, 0, 0) if err != nil { t.Fatalf("cannot open storage: %s", err) } // Verify no label names exist lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline) if err != nil { t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err) } if len(lns) != 0 { t.Fatalf("found non-empty tag keys at the start: %q", lns) } t.Run("serial", func(t *testing.T) { for i := 0; i < 3; i++ { if err = testStorageDeleteSeries(s, 0); err != nil { t.Fatalf("unexpected error on iteration %d: %s", i, err) } // Re-open the storage in order to check how deleted metricIDs // are persisted. s.MustClose() s, err = OpenStorage(path, 0, 0, 0) if err != nil { t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err) } } }) t.Run("concurrent", func(t *testing.T) { ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func(workerNum int) { var err error for j := 0; j < 2; j++ { err = testStorageDeleteSeries(s, workerNum) if err != nil { break } } ch <- err }(i) } for i := 0; i < cap(ch); i++ { select { case err := <-ch: if err != nil { t.Fatalf("unexpected error: %s", err) } case <-time.After(30 * time.Second): t.Fatalf("timeout") } } }) // Verify no more tag keys exist lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline) if err != nil { t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange after the test: %s", err) } if len(lns) != 0 { t.Fatalf("found non-empty tag keys after the test: %q", lns) } s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func testStorageDeleteSeries(s *Storage, workerNum int) error { const rowsPerMetric = 100 const metricsCount = 30 workerTag := []byte(fmt.Sprintf("workerTag_%d", workerNum)) lnsAll := make(map[string]bool) lnsAll["__name__"] = true for i := 0; i < metricsCount; i++ { var mrs []MetricRow var mn MetricName job := fmt.Sprintf("job_%d_%d", i, workerNum) instance := fmt.Sprintf("instance_%d_%d", i, workerNum) mn.Tags = []Tag{ {[]byte("job"), []byte(job)}, {[]byte("instance"), []byte(instance)}, {workerTag, []byte("foobar")}, } for i := range mn.Tags { lnsAll[string(mn.Tags[i].Key)] = true } mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", i, workerNum)) metricNameRaw := mn.marshalRaw(nil) for j := 0; j < rowsPerMetric; j++ { timestamp := rand.Int63n(1e10) value := rand.NormFloat64() * 1e6 mr := MetricRow{ MetricNameRaw: metricNameRaw, Timestamp: timestamp, Value: value, } mrs = append(mrs, mr) } if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { return fmt.Errorf("unexpected error when adding mrs: %w", err) } } s.DebugFlush() // Verify tag values exist tvs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange before metrics removal: %w", err) } if len(tvs) == 0 { return fmt.Errorf("unexpected empty number of tag values for workerTag") } // Verify tag keys exist lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline) if err != nil { return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange before metrics removal: %w", err) } if err := checkLabelNames(lns, lnsAll); err != nil { return fmt.Errorf("unexpected label names before metrics removal: %w", err) } var sr Search tr := TimeRange{ MinTimestamp: 0, MaxTimestamp: 2e10, } metricBlocksCount := func(tfs *TagFilters) int { // Verify the number of blocks n := 0 sr.Init(nil, s, []*TagFilters{tfs}, tr, 1e5, noDeadline) for sr.NextMetricBlock() { n++ } sr.MustClose() return n } for i := 0; i < metricsCount; i++ { tfs := NewTagFilters() if err := tfs.Add(nil, []byte("metric_.+"), false, true); err != nil { return fmt.Errorf("cannot add regexp tag filter: %w", err) } job := fmt.Sprintf("job_%d_%d", i, workerNum) if err := tfs.Add([]byte("job"), []byte(job), false, false); err != nil { return fmt.Errorf("cannot add job tag filter: %w", err) } if n := metricBlocksCount(tfs); n == 0 { return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs) } deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}) if err != nil { return fmt.Errorf("cannot delete metrics: %w", err) } if deletedCount == 0 { return fmt.Errorf("expecting non-zero number of deleted metrics on iteration %d", i) } if n := metricBlocksCount(tfs); n != 0 { return fmt.Errorf("expecting zero metric blocks after DeleteSeries call for tfs=%s; got %d blocks", tfs, n) } // Try deleting empty tfss deletedCount, err = s.DeleteSeries(nil, nil) if err != nil { return fmt.Errorf("cannot delete empty tfss: %w", err) } if deletedCount != 0 { return fmt.Errorf("expecting zero deleted metrics for empty tfss; got %d", deletedCount) } } // Make sure no more metrics left for the given workerNum tfs := NewTagFilters() if err := tfs.Add(nil, []byte(fmt.Sprintf("metric_.+_%d", workerNum)), false, true); err != nil { return fmt.Errorf("cannot add regexp tag filter for worker metrics: %w", err) } if n := metricBlocksCount(tfs); n != 0 { return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n) } tvs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange after all the metrics are removed: %w", err) } if len(tvs) != 0 { return fmt.Errorf("found non-empty tag values for %q after metrics removal: %q", workerTag, tvs) } return nil } func checkLabelNames(lns []string, lnsExpected map[string]bool) error { if len(lns) < len(lnsExpected) { return fmt.Errorf("unexpected number of label names found; got %d; want at least %d; lns=%q, lnsExpected=%v", len(lns), len(lnsExpected), lns, lnsExpected) } hasItem := func(s string, lns []string) bool { for _, labelName := range lns { if s == labelName { return true } } return false } for labelName := range lnsExpected { if !hasItem(labelName, lns) { return fmt.Errorf("cannot find %q in label names %q", labelName, lns) } } return nil } func TestStorageRegisterMetricNamesSerial(t *testing.T) { path := "TestStorageRegisterMetricNamesSerial" s, err := OpenStorage(path, 0, 0, 0) if err != nil { t.Fatalf("cannot open storage: %s", err) } if err := testStorageRegisterMetricNames(s); err != nil { t.Fatalf("unexpected error: %s", err) } s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func TestStorageRegisterMetricNamesConcurrent(t *testing.T) { path := "TestStorageRegisterMetricNamesConcurrent" s, err := OpenStorage(path, 0, 0, 0) if err != nil { t.Fatalf("cannot open storage: %s", err) } ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func() { ch <- testStorageRegisterMetricNames(s) }() } for i := 0; i < cap(ch); i++ { select { case err := <-ch: if err != nil { t.Fatalf("unexpected error: %s", err) } case <-time.After(10 * time.Second): t.Fatalf("timeout") } } s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func testStorageRegisterMetricNames(s *Storage) error { const metricsPerAdd = 1e3 const addsCount = 10 addIDsMap := make(map[string]struct{}) for i := 0; i < addsCount; i++ { var mrs []MetricRow var mn MetricName addID := fmt.Sprintf("%d", i) addIDsMap[addID] = struct{}{} mn.Tags = []Tag{ {[]byte("job"), []byte("webservice")}, {[]byte("instance"), []byte("1.2.3.4")}, {[]byte("add_id"), []byte(addID)}, } now := timestampFromTime(time.Now()) for j := 0; j < metricsPerAdd; j++ { mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", j)) metricNameRaw := mn.marshalRaw(nil) mr := MetricRow{ MetricNameRaw: metricNameRaw, Timestamp: now, } mrs = append(mrs, mr) } if err := s.RegisterMetricNames(nil, mrs); err != nil { return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err) } } var addIDsExpected []string for k := range addIDsMap { addIDsExpected = append(addIDsExpected, k) } sort.Strings(addIDsExpected) // Verify the storage contains the added metric names. s.DebugFlush() // Verify that SearchLabelNamesWithFiltersOnTimeRange returns correct result. lnsExpected := []string{ "__name__", "add_id", "instance", "job", } lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 100, 1e9, noDeadline) if err != nil { return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err) } sort.Strings(lns) if !reflect.DeepEqual(lns, lnsExpected) { return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected) } // Verify that SearchLabelNamesWithFiltersOnTimeRange with the specified time range returns correct result. now := timestampFromTime(time.Now()) start := now - msecPerDay end := now + 60*1000 tr := TimeRange{ MinTimestamp: start, MaxTimestamp: end, } lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, tr, 100, 1e9, noDeadline) if err != nil { return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err) } sort.Strings(lns) if !reflect.DeepEqual(lns, lnsExpected) { return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected) } // Verify that SearchLabelValuesWithFiltersOnTimeRange returns correct result. addIDs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err) } sort.Strings(addIDs) if !reflect.DeepEqual(addIDs, addIDsExpected) { return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected) } // Verify that SearchLabelValuesWithFiltersOnTimeRange with the specified time range returns correct result. addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, tr, addsCount+100, 1e9, noDeadline) if err != nil { return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err) } sort.Strings(addIDs) if !reflect.DeepEqual(addIDs, addIDsExpected) { return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected) } // Verify that SearchMetricNames returns correct result. tfs := NewTagFilters() if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil { return fmt.Errorf("unexpected error in TagFilters.Add: %w", err) } metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline) if err != nil { return fmt.Errorf("error in SearchMetricNames: %w", err) } if err != nil { return fmt.Errorf("cannot unmarshal metric names: %w", err) } if len(metricNames) < metricsPerAdd { return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(metricNames), int(metricsPerAdd)) } var mn MetricName for i, metricName := range metricNames { if err := mn.UnmarshalString(metricName); err != nil { return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err) } addID := mn.GetTagValue("add_id") if string(addID) != "0" { return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0") } job := mn.GetTagValue("job") if string(job) != "webservice" { return fmt.Errorf("unexpected job for metricName #%d; got %q; want %q", i, job, "webservice") } } return nil } func TestStorageAddRowsSerial(t *testing.T) { path := "TestStorageAddRowsSerial" s, err := OpenStorage(path, 0, 1e5, 1e5) if err != nil { t.Fatalf("cannot open storage: %s", err) } if err := testStorageAddRows(s); err != nil { t.Fatalf("unexpected error: %s", err) } s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func TestStorageAddRowsConcurrent(t *testing.T) { path := "TestStorageAddRowsConcurrent" s, err := OpenStorage(path, 0, 1e5, 1e5) if err != nil { t.Fatalf("cannot open storage: %s", err) } ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func() { ch <- testStorageAddRows(s) }() } for i := 0; i < cap(ch); i++ { select { case err := <-ch: if err != nil { t.Fatalf("unexpected error: %s", err) } case <-time.After(10 * time.Second): t.Fatalf("timeout") } } s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func testGenerateMetricRows(rows uint64, timestampMin, timestampMax int64) []MetricRow { var mrs []MetricRow var mn MetricName mn.Tags = []Tag{ {[]byte("job"), []byte("webservice")}, {[]byte("instance"), []byte("1.2.3.4")}, } for i := 0; i < int(rows); i++ { mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i)) metricNameRaw := mn.marshalRaw(nil) timestamp := rand.Int63n(timestampMax-timestampMin) + timestampMin value := rand.NormFloat64() * 1e6 mr := MetricRow{ MetricNameRaw: metricNameRaw, Timestamp: timestamp, Value: value, } mrs = append(mrs, mr) } return mrs } func testStorageAddRows(s *Storage) error { const rowsPerAdd = 1e3 const addsCount = 10 for i := 0; i < addsCount; i++ { mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10) if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { return fmt.Errorf("unexpected error when adding mrs: %w", err) } } // Verify the storage contains rows. minRowsExpected := uint64(rowsPerAdd * addsCount) var m Metrics s.UpdateMetrics(&m) if m.TableMetrics.SmallRowsCount < minRowsExpected { return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, m.TableMetrics.SmallRowsCount) } // Try creating a snapshot from the storage. snapshotName, err := s.CreateSnapshot() if err != nil { return fmt.Errorf("cannot create snapshot from the storage: %w", err) } // Verify the snapshot is visible snapshots, err := s.ListSnapshots() if err != nil { return fmt.Errorf("cannot list snapshots: %w", err) } if !containsString(snapshots, snapshotName) { return fmt.Errorf("cannot find snapshot %q in %q", snapshotName, snapshots) } // Try opening the storage from snapshot. snapshotPath := s.path + "/snapshots/" + snapshotName s1, err := OpenStorage(snapshotPath, 0, 0, 0) if err != nil { return fmt.Errorf("cannot open storage from snapshot: %w", err) } // Verify the snapshot contains rows var m1 Metrics s1.UpdateMetrics(&m1) if m1.TableMetrics.SmallRowsCount < minRowsExpected { return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount) } // Verify that force merge for the snapshot leaves only a single part per partition. if err := s1.ForceMergePartitions(""); err != nil { return fmt.Errorf("error when force merging partitions: %w", err) } ptws := s1.tb.GetPartitions(nil) for _, ptw := range ptws { pws := ptw.pt.GetParts(nil) numParts := len(pws) ptw.pt.PutParts(pws) if numParts != 1 { s1.tb.PutPartitions(ptws) return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts) } } s1.tb.PutPartitions(ptws) s1.MustClose() // Delete the snapshot and make sure it is no longer visible. if err := s.DeleteSnapshot(snapshotName); err != nil { return fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err) } snapshots, err = s.ListSnapshots() if err != nil { return fmt.Errorf("cannot list snapshots: %w", err) } if containsString(snapshots, snapshotName) { return fmt.Errorf("snapshot %q must be deleted, but is still visible in %q", snapshotName, snapshots) } return nil } func TestStorageRotateIndexDB(t *testing.T) { path := "TestStorageRotateIndexDB" s, err := OpenStorage(path, 0, 0, 0) if err != nil { t.Fatalf("cannot open storage: %s", err) } // Start indexDB rotater in a separate goroutine stopCh := make(chan struct{}) rotateDoneCh := make(chan struct{}) go func() { for { select { case <-stopCh: close(rotateDoneCh) return default: time.Sleep(time.Millisecond) s.mustRotateIndexDB() } } }() // Run concurrent workers that insert / select data from the storage. ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func(workerNum int) { ch <- testStorageAddMetrics(s, workerNum) }(i) } for i := 0; i < cap(ch); i++ { select { case err := <-ch: if err != nil { t.Fatalf("unexpected error: %s", err) } case <-time.After(10 * time.Second): t.Fatalf("timeout") } } close(stopCh) <-rotateDoneCh s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func testStorageAddMetrics(s *Storage, workerNum int) error { const rowsCount = 1e3 var mn MetricName mn.Tags = []Tag{ {[]byte("job"), []byte(fmt.Sprintf("webservice_%d", workerNum))}, {[]byte("instance"), []byte("1.2.3.4")}, } for i := 0; i < rowsCount; i++ { mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", workerNum, rand.Intn(10))) metricNameRaw := mn.marshalRaw(nil) timestamp := rand.Int63n(1e10) value := rand.NormFloat64() * 1e6 mr := MetricRow{ MetricNameRaw: metricNameRaw, Timestamp: timestamp, Value: value, } if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil { return fmt.Errorf("unexpected error when adding mrs: %w", err) } } // Verify the storage contains rows. minRowsExpected := uint64(rowsCount) var m Metrics s.UpdateMetrics(&m) if m.TableMetrics.SmallRowsCount < minRowsExpected { return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, m.TableMetrics.SmallRowsCount) } return nil } func TestStorageDeleteStaleSnapshots(t *testing.T) { path := "TestStorageDeleteStaleSnapshots" s, err := OpenStorage(path, 0, 1e5, 1e5) if err != nil { t.Fatalf("cannot open storage: %s", err) } const rowsPerAdd = 1e3 const addsCount = 10 for i := 0; i < addsCount; i++ { mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10) if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { t.Fatalf("unexpected error when adding mrs: %s", err) } } // Try creating a snapshot from the storage. snapshotName, err := s.CreateSnapshot() if err != nil { t.Fatalf("cannot create snapshot from the storage: %s", err) } // Delete snapshots older than 1 month if err := s.DeleteStaleSnapshots(30 * 24 * time.Hour); err != nil { t.Fatalf("error in DeleteStaleSnapshots(1 month): %s", err) } snapshots, err := s.ListSnapshots() if err != nil { t.Fatalf("cannot list snapshots: %s", err) } if len(snapshots) != 1 { t.Fatalf("expecting one snapshot; got %q", snapshots) } if snapshots[0] != snapshotName { t.Fatalf("snapshot %q is missing in %q", snapshotName, snapshots) } // Delete the snapshot which is older than 1 nanoseconds time.Sleep(2 * time.Nanosecond) if err := s.DeleteStaleSnapshots(time.Nanosecond); err != nil { t.Fatalf("cannot delete snapshot %q: %s", snapshotName, err) } snapshots, err = s.ListSnapshots() if err != nil { t.Fatalf("cannot list snapshots: %s", err) } if len(snapshots) != 0 { t.Fatalf("expecting zero snapshots; got %q", snapshots) } s.MustClose() if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) } } func containsString(a []string, s string) bool { for i := range a { if a[i] == s { return true } } return false }