mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
39cdc546dd
* lib/{fs,mergeset,storage}: skip `.must-remove.` dirs when creating snapshot (#3858) * lib/{mergeset,storage}: add timeout configuration for snapshots creation, remove incomplete snapshots from storage * docs: fix formatting * app/vmstorage: add metrics to track status of snapshots * app/vmstorage: use `vm_http_requests_total` metric for snapshot endpoints metrics, rename new flag to make name more clear Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: update flag name in docs Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * app/vmstorage: reflect new metrics names change in docs Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
1233 lines
35 KiB
Go
1233 lines
35 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"testing/quick"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
|
)
|
|
|
|
func TestReplaceAlternateRegexpsWithGraphiteWildcards(t *testing.T) {
|
|
f := func(q, resultExpected string) {
|
|
t.Helper()
|
|
result := replaceAlternateRegexpsWithGraphiteWildcards([]byte(q))
|
|
if string(result) != resultExpected {
|
|
t.Fatalf("unexpected result for %s\ngot\n%s\nwant\n%s", q, result, resultExpected)
|
|
}
|
|
}
|
|
f("", "")
|
|
f("foo", "foo")
|
|
f("foo(bar", "foo(bar")
|
|
f("foo.(bar|baz", "foo.(bar|baz")
|
|
f("foo.(bar).x", "foo.{bar}.x")
|
|
f("foo.(bar|baz).*.{x,y}", "foo.{bar,baz}.*.{x,y}")
|
|
f("foo.(bar|baz).*.{x,y}(z|aa)", "foo.{bar,baz}.*.{x,y}{z,aa}")
|
|
f("foo(.*)", "foo*")
|
|
}
|
|
|
|
func TestGetRegexpForGraphiteNodeQuery(t *testing.T) {
|
|
f := func(q, expectedRegexp string) {
|
|
t.Helper()
|
|
re, err := getRegexpForGraphiteQuery(q)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error for query=%q: %s", q, err)
|
|
}
|
|
reStr := re.String()
|
|
if reStr != expectedRegexp {
|
|
t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp)
|
|
}
|
|
}
|
|
f(``, `^$`)
|
|
f(`*`, `^[^.]*$`)
|
|
f(`foo.`, `^foo\.$`)
|
|
f(`foo.bar`, `^foo\.bar$`)
|
|
f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`)
|
|
f(`[-a-zx.]`, `^[-a-zx.]$`)
|
|
f(`**`, `^[^.]*[^.]*$`)
|
|
f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`)
|
|
f(`foo{bar`, `^foo\{bar$`)
|
|
f(`foo{ba,r`, `^foo\{ba,r$`)
|
|
f(`foo[bar`, `^foo\[bar$`)
|
|
f(`foo{bar}`, `^foobar$`)
|
|
f(`foo{bar,,b{{a,b*},z},[x-y]*z}a`, `^foo(?:bar||b(?:(?:a|b[^.]*)|z)|[x-y][^.]*z)a$`)
|
|
}
|
|
|
|
func TestDateMetricIDCacheSerial(t *testing.T) {
|
|
c := newDateMetricIDCache()
|
|
if err := testDateMetricIDCache(c, false); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
}
|
|
|
|
func TestDateMetricIDCacheConcurrent(t *testing.T) {
|
|
c := newDateMetricIDCache()
|
|
ch := make(chan error, 5)
|
|
for i := 0; i < 5; i++ {
|
|
go func() {
|
|
ch <- testDateMetricIDCache(c, true)
|
|
}()
|
|
}
|
|
for i := 0; i < 5; i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(time.Second * 5):
|
|
t.Fatalf("timeout")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
|
type dmk struct {
|
|
date uint64
|
|
metricID uint64
|
|
}
|
|
m := make(map[dmk]bool)
|
|
for i := 0; i < 1e5; i++ {
|
|
date := uint64(i) % 3
|
|
metricID := uint64(i) % 1237
|
|
if !concurrent && c.Has(date, metricID) {
|
|
if !m[dmk{date, metricID}] {
|
|
return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID)
|
|
}
|
|
continue
|
|
}
|
|
c.Set(date, metricID)
|
|
m[dmk{date, metricID}] = true
|
|
if !concurrent && !c.Has(date, metricID) {
|
|
return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID)
|
|
}
|
|
if i%11234 == 0 {
|
|
c.mu.Lock()
|
|
c.syncLocked()
|
|
c.mu.Unlock()
|
|
}
|
|
if i%34323 == 0 {
|
|
c.Reset()
|
|
m = make(map[dmk]bool)
|
|
}
|
|
}
|
|
|
|
// Verify fast path after sync.
|
|
for i := 0; i < 1e5; i++ {
|
|
date := uint64(i) % 3
|
|
metricID := uint64(i) % 123
|
|
c.Set(date, metricID)
|
|
}
|
|
c.mu.Lock()
|
|
c.syncLocked()
|
|
c.mu.Unlock()
|
|
for i := 0; i < 1e5; i++ {
|
|
date := uint64(i) % 3
|
|
metricID := uint64(i) % 123
|
|
if !concurrent && !c.Has(date, metricID) {
|
|
return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID)
|
|
}
|
|
}
|
|
|
|
// Verify c.Reset
|
|
if n := c.EntriesCount(); !concurrent && n < 123 {
|
|
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
|
|
}
|
|
c.Reset()
|
|
if n := c.EntriesCount(); !concurrent && n > 0 {
|
|
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|
newStorage := func() *Storage {
|
|
var s Storage
|
|
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
|
s.prevHourMetricIDs.Store(&hourMetricIDs{})
|
|
s.pendingHourEntries = &uint64set.Set{}
|
|
return &s
|
|
}
|
|
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
|
s := newStorage()
|
|
hour := fasttime.UnixHour()
|
|
if hour%24 == 0 {
|
|
hour++
|
|
}
|
|
hmOrig := &hourMetricIDs{
|
|
m: &uint64set.Set{},
|
|
hour: hour - 1,
|
|
}
|
|
hmOrig.m.Add(12)
|
|
hmOrig.m.Add(34)
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
s.updateCurrHourMetricIDs(hour)
|
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
|
if hmCurr.hour != hour {
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
}
|
|
if hmCurr.m.Len() != 0 {
|
|
t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
|
|
}
|
|
|
|
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
|
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
|
}
|
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
}
|
|
})
|
|
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
|
s := newStorage()
|
|
hour := fasttime.UnixHour()
|
|
hmOrig := &hourMetricIDs{
|
|
m: &uint64set.Set{},
|
|
hour: hour,
|
|
}
|
|
hmOrig.m.Add(12)
|
|
hmOrig.m.Add(34)
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
s.updateCurrHourMetricIDs(hour)
|
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
|
if hmCurr.hour != hour {
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
}
|
|
if !reflect.DeepEqual(hmCurr, hmOrig) {
|
|
t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig)
|
|
}
|
|
|
|
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
|
hmEmpty := &hourMetricIDs{}
|
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
|
}
|
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
}
|
|
})
|
|
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
|
s := newStorage()
|
|
pendingHourEntries := &uint64set.Set{}
|
|
pendingHourEntries.Add(343)
|
|
pendingHourEntries.Add(32424)
|
|
pendingHourEntries.Add(8293432)
|
|
s.pendingHourEntries = pendingHourEntries
|
|
|
|
hour := fasttime.UnixHour()
|
|
if hour%24 == 0 {
|
|
hour++
|
|
}
|
|
hmOrig := &hourMetricIDs{
|
|
m: &uint64set.Set{},
|
|
hour: hour - 1,
|
|
}
|
|
hmOrig.m.Add(12)
|
|
hmOrig.m.Add(34)
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
s.updateCurrHourMetricIDs(hour)
|
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
|
if hmCurr.hour != hour {
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
}
|
|
if !hmCurr.m.Equal(pendingHourEntries) {
|
|
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries)
|
|
}
|
|
|
|
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
|
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
|
}
|
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
}
|
|
})
|
|
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
|
s := newStorage()
|
|
pendingHourEntries := &uint64set.Set{}
|
|
pendingHourEntries.Add(343)
|
|
pendingHourEntries.Add(32424)
|
|
pendingHourEntries.Add(8293432)
|
|
s.pendingHourEntries = pendingHourEntries
|
|
|
|
hour := fasttime.UnixHour()
|
|
hmOrig := &hourMetricIDs{
|
|
m: &uint64set.Set{},
|
|
hour: hour,
|
|
}
|
|
hmOrig.m.Add(12)
|
|
hmOrig.m.Add(34)
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
s.updateCurrHourMetricIDs(hour)
|
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
|
if hmCurr.hour != hour {
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
}
|
|
m := pendingHourEntries.Clone()
|
|
hmOrig.m.ForEach(func(part []uint64) bool {
|
|
for _, metricID := range part {
|
|
m.Add(metricID)
|
|
}
|
|
return true
|
|
})
|
|
if !hmCurr.m.Equal(m) {
|
|
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
|
|
}
|
|
|
|
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
|
hmEmpty := &hourMetricIDs{}
|
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
|
}
|
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
}
|
|
})
|
|
t.Run("nonempty_pending_metric_ids_valid_curr_hour_start_of_day", func(t *testing.T) {
|
|
s := newStorage()
|
|
pendingHourEntries := &uint64set.Set{}
|
|
pendingHourEntries.Add(343)
|
|
pendingHourEntries.Add(32424)
|
|
pendingHourEntries.Add(8293432)
|
|
s.pendingHourEntries = pendingHourEntries
|
|
|
|
hour := fasttime.UnixHour()
|
|
hour -= hour % 24
|
|
hmOrig := &hourMetricIDs{
|
|
m: &uint64set.Set{},
|
|
hour: hour,
|
|
}
|
|
hmOrig.m.Add(12)
|
|
hmOrig.m.Add(34)
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
s.updateCurrHourMetricIDs(hour)
|
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
|
if hmCurr.hour != hour {
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
}
|
|
m := pendingHourEntries.Clone()
|
|
hmOrig.m.ForEach(func(part []uint64) bool {
|
|
for _, metricID := range part {
|
|
m.Add(metricID)
|
|
}
|
|
return true
|
|
})
|
|
if !hmCurr.m.Equal(m) {
|
|
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
|
|
}
|
|
|
|
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
|
hmEmpty := &hourMetricIDs{}
|
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
|
}
|
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
}
|
|
})
|
|
t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) {
|
|
s := newStorage()
|
|
|
|
hour := fasttime.UnixHour()
|
|
hour -= hour % 24
|
|
|
|
pendingHourEntries := &uint64set.Set{}
|
|
pendingHourEntries.Add(343)
|
|
pendingHourEntries.Add(32424)
|
|
pendingHourEntries.Add(8293432)
|
|
s.pendingHourEntries = pendingHourEntries
|
|
|
|
hmOrig := &hourMetricIDs{
|
|
m: &uint64set.Set{},
|
|
hour: hour - 1,
|
|
}
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
s.updateCurrHourMetricIDs(hour)
|
|
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
|
|
if hmCurr.hour != hour {
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
}
|
|
if hmCurr.m.Len() != 0 {
|
|
t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil))
|
|
}
|
|
hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs)
|
|
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
|
}
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestMetricRowMarshalUnmarshal(t *testing.T) {
|
|
var buf []byte
|
|
typ := reflect.TypeOf(&MetricRow{})
|
|
rng := rand.New(rand.NewSource(1))
|
|
|
|
for i := 0; i < 1000; i++ {
|
|
v, ok := quick.Value(typ, rng)
|
|
if !ok {
|
|
t.Fatalf("cannot create random MetricRow via quick.Value")
|
|
}
|
|
mr1 := v.Interface().(*MetricRow)
|
|
if mr1 == nil {
|
|
continue
|
|
}
|
|
|
|
buf = mr1.Marshal(buf[:0])
|
|
var mr2 MetricRow
|
|
tail, err := mr2.UnmarshalX(buf)
|
|
if err != nil {
|
|
t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err)
|
|
}
|
|
if len(tail) > 0 {
|
|
t.Fatalf("non-empty tail returned after MetricRow.Unmarshal for mr1=%s", mr1)
|
|
}
|
|
if mr1.MetricNameRaw == nil {
|
|
mr1.MetricNameRaw = []byte{}
|
|
}
|
|
if mr2.MetricNameRaw == nil {
|
|
mr2.MetricNameRaw = []byte{}
|
|
}
|
|
if !reflect.DeepEqual(mr1, &mr2) {
|
|
t.Fatalf("mr1 should match mr2; got\nmr1=%s\nmr2=%s", mr1, &mr2)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestNextRetentionDuration(t *testing.T) {
|
|
for retentionMonths := float64(0.1); retentionMonths < 120; retentionMonths += 0.3 {
|
|
d := nextRetentionDuration(int64(retentionMonths * msecsPerMonth))
|
|
if d <= 0 {
|
|
currTime := time.Now().UTC()
|
|
nextTime := time.Now().UTC().Add(d)
|
|
t.Fatalf("unexpected retention duration for retentionMonths=%f; got %s; must be %s + %f months", retentionMonths, nextTime, currTime, retentionMonths)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestStorageOpenClose(t *testing.T) {
|
|
path := "TestStorageOpenClose"
|
|
for i := 0; i < 10; i++ {
|
|
s, err := OpenStorage(path, -1, 1e5, 1e6)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
s.MustClose()
|
|
}
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func TestStorageOpenMultipleTimes(t *testing.T) {
|
|
path := "TestStorageOpenMultipleTimes"
|
|
s1, err := OpenStorage(path, -1, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage the first time: %s", err)
|
|
}
|
|
|
|
for i := 0; i < 10; i++ {
|
|
s2, err := OpenStorage(path, -1, 0, 0)
|
|
if err == nil {
|
|
s2.MustClose()
|
|
t.Fatalf("expecting non-nil error when opening already opened storage")
|
|
}
|
|
}
|
|
s1.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func TestStorageRandTimestamps(t *testing.T) {
|
|
path := "TestStorageRandTimestamps"
|
|
retentionMsecs := int64(10 * msecsPerMonth)
|
|
s, err := OpenStorage(path, retentionMsecs, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
t.Run("serial", func(t *testing.T) {
|
|
for i := 0; i < 3; i++ {
|
|
if err := testStorageRandTimestamps(s); err != nil {
|
|
t.Fatalf("error on iteration %d: %s", i, err)
|
|
}
|
|
s.MustClose()
|
|
s, err = OpenStorage(path, retentionMsecs, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage on iteration %d: %s", i, err)
|
|
}
|
|
}
|
|
})
|
|
t.Run("concurrent", func(t *testing.T) {
|
|
ch := make(chan error, 3)
|
|
for i := 0; i < cap(ch); i++ {
|
|
go func() {
|
|
var err error
|
|
for i := 0; i < 2; i++ {
|
|
err = testStorageRandTimestamps(s)
|
|
}
|
|
ch <- err
|
|
}()
|
|
}
|
|
tt := time.NewTimer(time.Second * 10)
|
|
for i := 0; i < cap(ch); i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("error on iteration %d: %s", i, err)
|
|
}
|
|
case <-tt.C:
|
|
t.Fatalf("timeout on iteration %d", i)
|
|
}
|
|
}
|
|
})
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func testStorageRandTimestamps(s *Storage) error {
|
|
currentTime := timestampFromTime(time.Now())
|
|
const rowsPerAdd = 5e3
|
|
const addsCount = 3
|
|
rng := rand.New(rand.NewSource(1))
|
|
|
|
for i := 0; i < addsCount; i++ {
|
|
var mrs []MetricRow
|
|
var mn MetricName
|
|
mn.Tags = []Tag{
|
|
{[]byte("job"), []byte("webservice")},
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
}
|
|
for j := 0; j < rowsPerAdd; j++ {
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rng.Intn(100)))
|
|
metricNameRaw := mn.marshalRaw(nil)
|
|
timestamp := currentTime - int64((rng.Float64()-0.2)*float64(2*s.retentionMsecs))
|
|
value := rng.NormFloat64() * 1e11
|
|
|
|
mr := MetricRow{
|
|
MetricNameRaw: metricNameRaw,
|
|
Timestamp: timestamp,
|
|
Value: value,
|
|
}
|
|
mrs = append(mrs, mr)
|
|
}
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
|
errStr := err.Error()
|
|
if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verify the storage contains rows.
|
|
var m Metrics
|
|
s.UpdateMetrics(&m)
|
|
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount == 0 {
|
|
return fmt.Errorf("expecting at least one row in storage")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestStorageDeleteSeries(t *testing.T) {
|
|
path := "TestStorageDeleteSeries"
|
|
s, err := OpenStorage(path, 0, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
|
|
// Verify no label names exist
|
|
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
|
if err != nil {
|
|
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
|
|
}
|
|
if len(lns) != 0 {
|
|
t.Fatalf("found non-empty tag keys at the start: %q", lns)
|
|
}
|
|
|
|
t.Run("serial", func(t *testing.T) {
|
|
for i := 0; i < 3; i++ {
|
|
if err = testStorageDeleteSeries(s, 0); err != nil {
|
|
t.Fatalf("unexpected error on iteration %d: %s", i, err)
|
|
}
|
|
|
|
// Re-open the storage in order to check how deleted metricIDs
|
|
// are persisted.
|
|
s.MustClose()
|
|
s, err = OpenStorage(path, 0, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage after closing on iteration %d: %s", i, err)
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("concurrent", func(t *testing.T) {
|
|
ch := make(chan error, 3)
|
|
for i := 0; i < cap(ch); i++ {
|
|
go func(workerNum int) {
|
|
var err error
|
|
for j := 0; j < 2; j++ {
|
|
err = testStorageDeleteSeries(s, workerNum)
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
ch <- err
|
|
}(i)
|
|
}
|
|
tt := time.NewTimer(30 * time.Second)
|
|
for i := 0; i < cap(ch); i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error on iteration %d: %s", i, err)
|
|
}
|
|
case <-tt.C:
|
|
t.Fatalf("timeout on iteration %d", i)
|
|
}
|
|
}
|
|
})
|
|
|
|
// Verify no more tag keys exist
|
|
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
|
if err != nil {
|
|
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange after the test: %s", err)
|
|
}
|
|
if len(lns) != 0 {
|
|
t.Fatalf("found non-empty tag keys after the test: %q", lns)
|
|
}
|
|
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func testStorageDeleteSeries(s *Storage, workerNum int) error {
|
|
rng := rand.New(rand.NewSource(1))
|
|
const rowsPerMetric = 100
|
|
const metricsCount = 30
|
|
|
|
workerTag := []byte(fmt.Sprintf("workerTag_%d", workerNum))
|
|
|
|
lnsAll := make(map[string]bool)
|
|
lnsAll["__name__"] = true
|
|
for i := 0; i < metricsCount; i++ {
|
|
var mrs []MetricRow
|
|
var mn MetricName
|
|
job := fmt.Sprintf("job_%d_%d", i, workerNum)
|
|
instance := fmt.Sprintf("instance_%d_%d", i, workerNum)
|
|
mn.Tags = []Tag{
|
|
{[]byte("job"), []byte(job)},
|
|
{[]byte("instance"), []byte(instance)},
|
|
{workerTag, []byte("foobar")},
|
|
}
|
|
for i := range mn.Tags {
|
|
lnsAll[string(mn.Tags[i].Key)] = true
|
|
}
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", i, workerNum))
|
|
metricNameRaw := mn.marshalRaw(nil)
|
|
|
|
for j := 0; j < rowsPerMetric; j++ {
|
|
timestamp := rng.Int63n(1e10)
|
|
value := rng.NormFloat64() * 1e6
|
|
|
|
mr := MetricRow{
|
|
MetricNameRaw: metricNameRaw,
|
|
Timestamp: timestamp,
|
|
Value: value,
|
|
}
|
|
mrs = append(mrs, mr)
|
|
}
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
}
|
|
}
|
|
s.DebugFlush()
|
|
|
|
// Verify tag values exist
|
|
tvs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange before metrics removal: %w", err)
|
|
}
|
|
if len(tvs) == 0 {
|
|
return fmt.Errorf("unexpected empty number of tag values for workerTag")
|
|
}
|
|
|
|
// Verify tag keys exist
|
|
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange before metrics removal: %w", err)
|
|
}
|
|
if err := checkLabelNames(lns, lnsAll); err != nil {
|
|
return fmt.Errorf("unexpected label names before metrics removal: %w", err)
|
|
}
|
|
|
|
var sr Search
|
|
tr := TimeRange{
|
|
MinTimestamp: 0,
|
|
MaxTimestamp: 2e10,
|
|
}
|
|
metricBlocksCount := func(tfs *TagFilters) int {
|
|
// Verify the number of blocks
|
|
n := 0
|
|
sr.Init(nil, s, []*TagFilters{tfs}, tr, 1e5, noDeadline)
|
|
for sr.NextMetricBlock() {
|
|
n++
|
|
}
|
|
sr.MustClose()
|
|
return n
|
|
}
|
|
for i := 0; i < metricsCount; i++ {
|
|
tfs := NewTagFilters()
|
|
if err := tfs.Add(nil, []byte("metric_.+"), false, true); err != nil {
|
|
return fmt.Errorf("cannot add regexp tag filter: %w", err)
|
|
}
|
|
job := fmt.Sprintf("job_%d_%d", i, workerNum)
|
|
if err := tfs.Add([]byte("job"), []byte(job), false, false); err != nil {
|
|
return fmt.Errorf("cannot add job tag filter: %w", err)
|
|
}
|
|
if n := metricBlocksCount(tfs); n == 0 {
|
|
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
|
|
}
|
|
deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs})
|
|
if err != nil {
|
|
return fmt.Errorf("cannot delete metrics: %w", err)
|
|
}
|
|
if deletedCount == 0 {
|
|
return fmt.Errorf("expecting non-zero number of deleted metrics on iteration %d", i)
|
|
}
|
|
if n := metricBlocksCount(tfs); n != 0 {
|
|
return fmt.Errorf("expecting zero metric blocks after DeleteSeries call for tfs=%s; got %d blocks", tfs, n)
|
|
}
|
|
|
|
// Try deleting empty tfss
|
|
deletedCount, err = s.DeleteSeries(nil, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot delete empty tfss: %w", err)
|
|
}
|
|
if deletedCount != 0 {
|
|
return fmt.Errorf("expecting zero deleted metrics for empty tfss; got %d", deletedCount)
|
|
}
|
|
}
|
|
|
|
// Make sure no more metrics left for the given workerNum
|
|
tfs := NewTagFilters()
|
|
if err := tfs.Add(nil, []byte(fmt.Sprintf("metric_.+_%d", workerNum)), false, true); err != nil {
|
|
return fmt.Errorf("cannot add regexp tag filter for worker metrics: %w", err)
|
|
}
|
|
if n := metricBlocksCount(tfs); n != 0 {
|
|
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
|
|
}
|
|
tvs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange after all the metrics are removed: %w", err)
|
|
}
|
|
if len(tvs) != 0 {
|
|
return fmt.Errorf("found non-empty tag values for %q after metrics removal: %q", workerTag, tvs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func checkLabelNames(lns []string, lnsExpected map[string]bool) error {
|
|
if len(lns) < len(lnsExpected) {
|
|
return fmt.Errorf("unexpected number of label names found; got %d; want at least %d; lns=%q, lnsExpected=%v", len(lns), len(lnsExpected), lns, lnsExpected)
|
|
}
|
|
hasItem := func(s string, lns []string) bool {
|
|
for _, labelName := range lns {
|
|
if s == labelName {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
for labelName := range lnsExpected {
|
|
if !hasItem(labelName, lns) {
|
|
return fmt.Errorf("cannot find %q in label names %q", labelName, lns)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
|
|
path := "TestStorageRegisterMetricNamesSerial"
|
|
s, err := OpenStorage(path, 0, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
if err := testStorageRegisterMetricNames(s); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
|
|
path := "TestStorageRegisterMetricNamesConcurrent"
|
|
s, err := OpenStorage(path, 0, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
ch := make(chan error, 3)
|
|
for i := 0; i < cap(ch); i++ {
|
|
go func() {
|
|
ch <- testStorageRegisterMetricNames(s)
|
|
}()
|
|
}
|
|
for i := 0; i < cap(ch); i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
}
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func testStorageRegisterMetricNames(s *Storage) error {
|
|
const metricsPerAdd = 1e3
|
|
const addsCount = 10
|
|
|
|
addIDsMap := make(map[string]struct{})
|
|
for i := 0; i < addsCount; i++ {
|
|
var mrs []MetricRow
|
|
var mn MetricName
|
|
addID := fmt.Sprintf("%d", i)
|
|
addIDsMap[addID] = struct{}{}
|
|
mn.Tags = []Tag{
|
|
{[]byte("job"), []byte("webservice")},
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
{[]byte("add_id"), []byte(addID)},
|
|
}
|
|
now := timestampFromTime(time.Now())
|
|
for j := 0; j < metricsPerAdd; j++ {
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", j))
|
|
metricNameRaw := mn.marshalRaw(nil)
|
|
|
|
mr := MetricRow{
|
|
MetricNameRaw: metricNameRaw,
|
|
Timestamp: now,
|
|
}
|
|
mrs = append(mrs, mr)
|
|
}
|
|
if err := s.RegisterMetricNames(nil, mrs); err != nil {
|
|
return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err)
|
|
}
|
|
}
|
|
var addIDsExpected []string
|
|
for k := range addIDsMap {
|
|
addIDsExpected = append(addIDsExpected, k)
|
|
}
|
|
sort.Strings(addIDsExpected)
|
|
|
|
// Verify the storage contains the added metric names.
|
|
s.DebugFlush()
|
|
|
|
// Verify that SearchLabelNamesWithFiltersOnTimeRange returns correct result.
|
|
lnsExpected := []string{
|
|
"__name__",
|
|
"add_id",
|
|
"instance",
|
|
"job",
|
|
}
|
|
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 100, 1e9, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
|
|
}
|
|
sort.Strings(lns)
|
|
if !reflect.DeepEqual(lns, lnsExpected) {
|
|
return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
|
|
}
|
|
|
|
// Verify that SearchLabelNamesWithFiltersOnTimeRange with the specified time range returns correct result.
|
|
now := timestampFromTime(time.Now())
|
|
start := now - msecPerDay
|
|
end := now + 60*1000
|
|
tr := TimeRange{
|
|
MinTimestamp: start,
|
|
MaxTimestamp: end,
|
|
}
|
|
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, tr, 100, 1e9, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
|
|
}
|
|
sort.Strings(lns)
|
|
if !reflect.DeepEqual(lns, lnsExpected) {
|
|
return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
|
|
}
|
|
|
|
// Verify that SearchLabelValuesWithFiltersOnTimeRange returns correct result.
|
|
addIDs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
|
|
}
|
|
sort.Strings(addIDs)
|
|
if !reflect.DeepEqual(addIDs, addIDsExpected) {
|
|
return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
|
|
}
|
|
|
|
// Verify that SearchLabelValuesWithFiltersOnTimeRange with the specified time range returns correct result.
|
|
addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, tr, addsCount+100, 1e9, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
|
|
}
|
|
sort.Strings(addIDs)
|
|
if !reflect.DeepEqual(addIDs, addIDsExpected) {
|
|
return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
|
|
}
|
|
|
|
// Verify that SearchMetricNames returns correct result.
|
|
tfs := NewTagFilters()
|
|
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
|
|
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
|
}
|
|
metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
|
|
if err != nil {
|
|
return fmt.Errorf("error in SearchMetricNames: %w", err)
|
|
}
|
|
if len(metricNames) < metricsPerAdd {
|
|
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(metricNames), int(metricsPerAdd))
|
|
}
|
|
var mn MetricName
|
|
for i, metricName := range metricNames {
|
|
if err := mn.UnmarshalString(metricName); err != nil {
|
|
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
|
|
}
|
|
addID := mn.GetTagValue("add_id")
|
|
if string(addID) != "0" {
|
|
return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0")
|
|
}
|
|
job := mn.GetTagValue("job")
|
|
if string(job) != "webservice" {
|
|
return fmt.Errorf("unexpected job for metricName #%d; got %q; want %q", i, job, "webservice")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func TestStorageAddRowsSerial(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
path := "TestStorageAddRowsSerial"
|
|
retentionMsecs := int64(msecsPerMonth * 10)
|
|
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
if err := testStorageAddRows(rng, s); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func TestStorageAddRowsConcurrent(t *testing.T) {
|
|
path := "TestStorageAddRowsConcurrent"
|
|
retentionMsecs := int64(msecsPerMonth * 10)
|
|
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
ch := make(chan error, 3)
|
|
for i := 0; i < cap(ch); i++ {
|
|
go func(n int) {
|
|
rLocal := rand.New(rand.NewSource(int64(n)))
|
|
ch <- testStorageAddRows(rLocal, s)
|
|
}(i)
|
|
}
|
|
for i := 0; i < cap(ch); i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
}
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func testGenerateMetricRows(rng *rand.Rand, rows uint64, timestampMin, timestampMax int64) []MetricRow {
|
|
var mrs []MetricRow
|
|
var mn MetricName
|
|
mn.Tags = []Tag{
|
|
{[]byte("job"), []byte("webservice")},
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
}
|
|
for i := 0; i < int(rows); i++ {
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i))
|
|
metricNameRaw := mn.marshalRaw(nil)
|
|
timestamp := rng.Int63n(timestampMax-timestampMin) + timestampMin
|
|
value := rng.NormFloat64() * 1e6
|
|
|
|
mr := MetricRow{
|
|
MetricNameRaw: metricNameRaw,
|
|
Timestamp: timestamp,
|
|
Value: value,
|
|
}
|
|
mrs = append(mrs, mr)
|
|
}
|
|
return mrs
|
|
}
|
|
|
|
func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
|
const rowsPerAdd = 1e3
|
|
const addsCount = 10
|
|
|
|
maxTimestamp := timestampFromTime(time.Now())
|
|
minTimestamp := maxTimestamp - s.retentionMsecs
|
|
for i := 0; i < addsCount; i++ {
|
|
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
}
|
|
}
|
|
|
|
// Verify the storage contains rows.
|
|
minRowsExpected := uint64(rowsPerAdd * addsCount)
|
|
var m Metrics
|
|
s.UpdateMetrics(&m)
|
|
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
|
|
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
|
|
}
|
|
|
|
// Try creating a snapshot from the storage.
|
|
snapshotName, err := s.CreateSnapshot(0)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create snapshot from the storage: %w", err)
|
|
}
|
|
|
|
// Verify the snapshot is visible
|
|
snapshots, err := s.ListSnapshots()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot list snapshots: %w", err)
|
|
}
|
|
if !containsString(snapshots, snapshotName) {
|
|
return fmt.Errorf("cannot find snapshot %q in %q", snapshotName, snapshots)
|
|
}
|
|
|
|
// Try opening the storage from snapshot.
|
|
snapshotPath := s.path + "/snapshots/" + snapshotName
|
|
s1, err := OpenStorage(snapshotPath, 0, 0, 0)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot open storage from snapshot: %w", err)
|
|
}
|
|
|
|
// Verify the snapshot contains rows
|
|
var m1 Metrics
|
|
s1.UpdateMetrics(&m1)
|
|
if rowsCount := m1.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
|
|
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, rowsCount)
|
|
}
|
|
|
|
// Verify that force merge for the snapshot leaves only a single part per partition.
|
|
if err := s1.ForceMergePartitions(""); err != nil {
|
|
return fmt.Errorf("error when force merging partitions: %w", err)
|
|
}
|
|
ptws := s1.tb.GetPartitions(nil)
|
|
for _, ptw := range ptws {
|
|
pws := ptw.pt.GetParts(nil, true)
|
|
numParts := len(pws)
|
|
ptw.pt.PutParts(pws)
|
|
if numParts != 1 {
|
|
s1.tb.PutPartitions(ptws)
|
|
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts)
|
|
}
|
|
}
|
|
s1.tb.PutPartitions(ptws)
|
|
|
|
s1.MustClose()
|
|
|
|
// Delete the snapshot and make sure it is no longer visible.
|
|
if err := s.DeleteSnapshot(snapshotName); err != nil {
|
|
return fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
|
|
}
|
|
snapshots, err = s.ListSnapshots()
|
|
if err != nil {
|
|
return fmt.Errorf("cannot list snapshots: %w", err)
|
|
}
|
|
if containsString(snapshots, snapshotName) {
|
|
return fmt.Errorf("snapshot %q must be deleted, but is still visible in %q", snapshotName, snapshots)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func TestStorageRotateIndexDB(t *testing.T) {
|
|
path := "TestStorageRotateIndexDB"
|
|
s, err := OpenStorage(path, 0, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
|
|
// Start indexDB rotater in a separate goroutine
|
|
stopCh := make(chan struct{})
|
|
rotateDoneCh := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
close(rotateDoneCh)
|
|
return
|
|
default:
|
|
time.Sleep(time.Millisecond)
|
|
s.mustRotateIndexDB()
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Run concurrent workers that insert / select data from the storage.
|
|
ch := make(chan error, 3)
|
|
for i := 0; i < cap(ch); i++ {
|
|
go func(workerNum int) {
|
|
ch <- testStorageAddMetrics(s, workerNum)
|
|
}(i)
|
|
}
|
|
for i := 0; i < cap(ch); i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
t.Fatalf("timeout")
|
|
}
|
|
}
|
|
|
|
close(stopCh)
|
|
<-rotateDoneCh
|
|
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func testStorageAddMetrics(s *Storage, workerNum int) error {
|
|
rng := rand.New(rand.NewSource(1))
|
|
const rowsCount = 1e3
|
|
|
|
var mn MetricName
|
|
mn.Tags = []Tag{
|
|
{[]byte("job"), []byte(fmt.Sprintf("webservice_%d", workerNum))},
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
}
|
|
for i := 0; i < rowsCount; i++ {
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", workerNum, rng.Intn(10)))
|
|
metricNameRaw := mn.marshalRaw(nil)
|
|
timestamp := rng.Int63n(1e10)
|
|
value := rng.NormFloat64() * 1e6
|
|
|
|
mr := MetricRow{
|
|
MetricNameRaw: metricNameRaw,
|
|
Timestamp: timestamp,
|
|
Value: value,
|
|
}
|
|
if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil {
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
|
}
|
|
}
|
|
|
|
// Verify the storage contains rows.
|
|
minRowsExpected := uint64(rowsCount)
|
|
var m Metrics
|
|
s.UpdateMetrics(&m)
|
|
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
|
|
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestStorageDeleteStaleSnapshots(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
path := "TestStorageDeleteStaleSnapshots"
|
|
retentionMsecs := int64(msecsPerMonth * 10)
|
|
s, err := OpenStorage(path, retentionMsecs, 1e5, 1e5)
|
|
if err != nil {
|
|
t.Fatalf("cannot open storage: %s", err)
|
|
}
|
|
const rowsPerAdd = 1e3
|
|
const addsCount = 10
|
|
maxTimestamp := timestampFromTime(time.Now())
|
|
minTimestamp := maxTimestamp - s.retentionMsecs
|
|
for i := 0; i < addsCount; i++ {
|
|
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
|
t.Fatalf("unexpected error when adding mrs: %s", err)
|
|
}
|
|
}
|
|
// Try creating a snapshot from the storage.
|
|
snapshotName, err := s.CreateSnapshot(0)
|
|
if err != nil {
|
|
t.Fatalf("cannot create snapshot from the storage: %s", err)
|
|
}
|
|
// Delete snapshots older than 1 month
|
|
if err := s.DeleteStaleSnapshots(30 * 24 * time.Hour); err != nil {
|
|
t.Fatalf("error in DeleteStaleSnapshots(1 month): %s", err)
|
|
}
|
|
snapshots, err := s.ListSnapshots()
|
|
if err != nil {
|
|
t.Fatalf("cannot list snapshots: %s", err)
|
|
}
|
|
if len(snapshots) != 1 {
|
|
t.Fatalf("expecting one snapshot; got %q", snapshots)
|
|
}
|
|
if snapshots[0] != snapshotName {
|
|
t.Fatalf("snapshot %q is missing in %q", snapshotName, snapshots)
|
|
}
|
|
|
|
// Delete the snapshot which is older than 1 nanoseconds
|
|
time.Sleep(2 * time.Nanosecond)
|
|
if err := s.DeleteStaleSnapshots(time.Nanosecond); err != nil {
|
|
t.Fatalf("cannot delete snapshot %q: %s", snapshotName, err)
|
|
}
|
|
snapshots, err = s.ListSnapshots()
|
|
if err != nil {
|
|
t.Fatalf("cannot list snapshots: %s", err)
|
|
}
|
|
if len(snapshots) != 0 {
|
|
t.Fatalf("expecting zero snapshots; got %q", snapshots)
|
|
}
|
|
s.MustClose()
|
|
if err := os.RemoveAll(path); err != nil {
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
}
|
|
}
|
|
|
|
func containsString(a []string, s string) bool {
|
|
for i := range a {
|
|
if a[i] == s {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|