2019-05-22 21:16:55 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"math/rand"
|
|
|
|
"os"
|
2023-03-25 21:33:54 +00:00
|
|
|
"path/filepath"
|
2019-05-22 21:16:55 +00:00
|
|
|
"reflect"
|
2020-11-15 22:42:27 +00:00
|
|
|
"sort"
|
2019-05-22 21:16:55 +00:00
|
|
|
"strings"
|
|
|
|
"testing"
|
|
|
|
"testing/quick"
|
|
|
|
"time"
|
2019-09-24 18:10:22 +00:00
|
|
|
|
2022-11-07 12:04:06 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
2019-09-24 18:10:22 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
2019-05-22 21:16:55 +00:00
|
|
|
)
|
|
|
|
|
2021-12-14 17:51:46 +00:00
|
|
|
func TestReplaceAlternateRegexpsWithGraphiteWildcards(t *testing.T) {
|
|
|
|
f := func(q, resultExpected string) {
|
|
|
|
t.Helper()
|
|
|
|
result := replaceAlternateRegexpsWithGraphiteWildcards([]byte(q))
|
|
|
|
if string(result) != resultExpected {
|
|
|
|
t.Fatalf("unexpected result for %s\ngot\n%s\nwant\n%s", q, result, resultExpected)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
f("", "")
|
|
|
|
f("foo", "foo")
|
|
|
|
f("foo(bar", "foo(bar")
|
|
|
|
f("foo.(bar|baz", "foo.(bar|baz")
|
|
|
|
f("foo.(bar).x", "foo.{bar}.x")
|
|
|
|
f("foo.(bar|baz).*.{x,y}", "foo.{bar,baz}.*.{x,y}")
|
|
|
|
f("foo.(bar|baz).*.{x,y}(z|aa)", "foo.{bar,baz}.*.{x,y}{z,aa}")
|
|
|
|
f("foo(.*)", "foo*")
|
|
|
|
}
|
|
|
|
|
2021-02-02 22:24:05 +00:00
|
|
|
func TestGetRegexpForGraphiteNodeQuery(t *testing.T) {
|
|
|
|
f := func(q, expectedRegexp string) {
|
|
|
|
t.Helper()
|
2021-02-03 18:12:17 +00:00
|
|
|
re, err := getRegexpForGraphiteQuery(q)
|
2021-02-02 22:24:05 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("unexpected error for query=%q: %s", q, err)
|
|
|
|
}
|
|
|
|
reStr := re.String()
|
|
|
|
if reStr != expectedRegexp {
|
|
|
|
t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
f(``, `^$`)
|
|
|
|
f(`*`, `^[^.]*$`)
|
|
|
|
f(`foo.`, `^foo\.$`)
|
|
|
|
f(`foo.bar`, `^foo\.bar$`)
|
|
|
|
f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`)
|
|
|
|
f(`[-a-zx.]`, `^[-a-zx.]$`)
|
|
|
|
f(`**`, `^[^.]*[^.]*$`)
|
|
|
|
f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`)
|
2021-02-03 18:12:17 +00:00
|
|
|
f(`foo{bar`, `^foo\{bar$`)
|
|
|
|
f(`foo{ba,r`, `^foo\{ba,r$`)
|
|
|
|
f(`foo[bar`, `^foo\[bar$`)
|
|
|
|
f(`foo{bar}`, `^foobar$`)
|
|
|
|
f(`foo{bar,,b{{a,b*},z},[x-y]*z}a`, `^foo(?:bar||b(?:(?:a|b[^.]*)|z)|[x-y][^.]*z)a$`)
|
2021-02-02 22:24:05 +00:00
|
|
|
}
|
|
|
|
|
2019-11-11 11:21:05 +00:00
|
|
|
func TestDateMetricIDCacheSerial(t *testing.T) {
|
|
|
|
c := newDateMetricIDCache()
|
|
|
|
if err := testDateMetricIDCache(c, false); err != nil {
|
|
|
|
t.Fatalf("unexpected error: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestDateMetricIDCacheConcurrent(t *testing.T) {
|
|
|
|
c := newDateMetricIDCache()
|
|
|
|
ch := make(chan error, 5)
|
|
|
|
for i := 0; i < 5; i++ {
|
|
|
|
go func() {
|
|
|
|
ch <- testDateMetricIDCache(c, true)
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
for i := 0; i < 5; i++ {
|
|
|
|
select {
|
|
|
|
case err := <-ch:
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("unexpected error: %s", err)
|
|
|
|
}
|
|
|
|
case <-time.After(time.Second * 5):
|
|
|
|
t.Fatalf("timeout")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
|
|
|
type dmk struct {
|
2023-07-22 22:20:21 +00:00
|
|
|
generation uint64
|
|
|
|
date uint64
|
|
|
|
metricID uint64
|
2019-11-11 11:21:05 +00:00
|
|
|
}
|
|
|
|
m := make(map[dmk]bool)
|
|
|
|
for i := 0; i < 1e5; i++ {
|
2023-07-22 22:20:21 +00:00
|
|
|
generation := uint64(i) % 2
|
2019-11-11 11:21:05 +00:00
|
|
|
date := uint64(i) % 3
|
|
|
|
metricID := uint64(i) % 1237
|
2023-07-22 22:20:21 +00:00
|
|
|
if !concurrent && c.Has(generation, date, metricID) {
|
|
|
|
if !m[dmk{generation, date, metricID}] {
|
|
|
|
return fmt.Errorf("c.Has(%d, %d, %d) must return false, but returned true", generation, date, metricID)
|
2019-11-11 11:21:05 +00:00
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
2023-07-22 22:20:21 +00:00
|
|
|
c.Set(generation, date, metricID)
|
|
|
|
m[dmk{generation, date, metricID}] = true
|
|
|
|
if !concurrent && !c.Has(generation, date, metricID) {
|
|
|
|
return fmt.Errorf("c.Has(%d, %d, %d) must return true, but returned false", generation, date, metricID)
|
2019-11-11 11:21:05 +00:00
|
|
|
}
|
|
|
|
if i%11234 == 0 {
|
2021-06-03 13:19:58 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
c.syncLocked()
|
|
|
|
c.mu.Unlock()
|
2019-11-11 11:21:05 +00:00
|
|
|
}
|
|
|
|
if i%34323 == 0 {
|
2023-07-22 22:20:21 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
c.resetLocked()
|
|
|
|
c.mu.Unlock()
|
2019-11-11 11:21:05 +00:00
|
|
|
m = make(map[dmk]bool)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify fast path after sync.
|
|
|
|
for i := 0; i < 1e5; i++ {
|
2023-07-22 22:20:21 +00:00
|
|
|
generation := uint64(i) % 2
|
2019-11-11 11:21:05 +00:00
|
|
|
date := uint64(i) % 3
|
|
|
|
metricID := uint64(i) % 123
|
2023-07-22 22:20:21 +00:00
|
|
|
c.Set(generation, date, metricID)
|
2019-11-11 11:21:05 +00:00
|
|
|
}
|
2021-06-03 13:19:58 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
c.syncLocked()
|
|
|
|
c.mu.Unlock()
|
2019-11-11 11:21:05 +00:00
|
|
|
for i := 0; i < 1e5; i++ {
|
2023-07-22 22:20:21 +00:00
|
|
|
generation := uint64(i) % 2
|
2019-11-11 11:21:05 +00:00
|
|
|
date := uint64(i) % 3
|
|
|
|
metricID := uint64(i) % 123
|
2023-07-22 22:20:21 +00:00
|
|
|
if !concurrent && !c.Has(generation, date, metricID) {
|
|
|
|
return fmt.Errorf("c.Has(%d, %d, %d) must return true after sync", generation, date, metricID)
|
2019-11-11 11:21:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify c.Reset
|
|
|
|
if n := c.EntriesCount(); !concurrent && n < 123 {
|
|
|
|
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
|
|
|
|
}
|
2023-07-22 22:20:21 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
c.resetLocked()
|
|
|
|
c.mu.Unlock()
|
2019-11-11 11:21:05 +00:00
|
|
|
if n := c.EntriesCount(); !concurrent && n > 0 {
|
|
|
|
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-06-09 16:06:53 +00:00
|
|
|
func TestUpdateCurrHourMetricIDs(t *testing.T) {
|
|
|
|
newStorage := func() *Storage {
|
|
|
|
var s Storage
|
|
|
|
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
|
|
|
s.prevHourMetricIDs.Store(&hourMetricIDs{})
|
2022-11-07 12:04:06 +00:00
|
|
|
s.pendingHourEntries = &uint64set.Set{}
|
2019-06-09 16:06:53 +00:00
|
|
|
return &s
|
|
|
|
}
|
2020-11-15 22:42:27 +00:00
|
|
|
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
2019-06-09 16:06:53 +00:00
|
|
|
s := newStorage()
|
2022-11-07 12:04:06 +00:00
|
|
|
hour := fasttime.UnixHour()
|
2022-12-03 01:07:13 +00:00
|
|
|
if hour%24 == 0 {
|
|
|
|
hour++
|
|
|
|
}
|
2019-06-09 16:06:53 +00:00
|
|
|
hmOrig := &hourMetricIDs{
|
2019-09-24 18:10:22 +00:00
|
|
|
m: &uint64set.Set{},
|
2022-12-03 01:07:13 +00:00
|
|
|
hour: hour - 1,
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
2019-09-24 18:10:22 +00:00
|
|
|
hmOrig.m.Add(12)
|
|
|
|
hmOrig.m.Add(34)
|
2019-06-09 16:06:53 +00:00
|
|
|
s.currHourMetricIDs.Store(hmOrig)
|
2022-11-07 11:55:37 +00:00
|
|
|
s.updateCurrHourMetricIDs(hour)
|
2023-07-20 00:37:49 +00:00
|
|
|
hmCurr := s.currHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
if hmCurr.hour != hour {
|
2022-11-07 11:55:37 +00:00
|
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
2019-09-24 18:10:22 +00:00
|
|
|
if hmCurr.m.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
|
|
|
|
2023-07-20 00:37:49 +00:00
|
|
|
hmPrev := s.prevHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
|
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
|
|
|
}
|
|
|
|
|
2019-11-08 17:37:16 +00:00
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
|
|
|
})
|
2020-11-15 22:42:27 +00:00
|
|
|
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
2019-06-09 16:06:53 +00:00
|
|
|
s := newStorage()
|
2022-11-07 12:04:06 +00:00
|
|
|
hour := fasttime.UnixHour()
|
2019-06-09 16:06:53 +00:00
|
|
|
hmOrig := &hourMetricIDs{
|
2019-09-24 18:10:22 +00:00
|
|
|
m: &uint64set.Set{},
|
2019-06-09 16:06:53 +00:00
|
|
|
hour: hour,
|
|
|
|
}
|
2019-09-24 18:10:22 +00:00
|
|
|
hmOrig.m.Add(12)
|
|
|
|
hmOrig.m.Add(34)
|
2019-06-09 16:06:53 +00:00
|
|
|
s.currHourMetricIDs.Store(hmOrig)
|
2022-11-07 11:55:37 +00:00
|
|
|
s.updateCurrHourMetricIDs(hour)
|
2023-07-20 00:37:49 +00:00
|
|
|
hmCurr := s.currHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
if hmCurr.hour != hour {
|
2022-11-07 11:55:37 +00:00
|
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
|
|
|
if !reflect.DeepEqual(hmCurr, hmOrig) {
|
|
|
|
t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig)
|
|
|
|
}
|
|
|
|
|
2023-07-20 00:37:49 +00:00
|
|
|
hmPrev := s.prevHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
hmEmpty := &hourMetricIDs{}
|
|
|
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
|
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
|
|
|
}
|
|
|
|
|
2019-11-08 17:37:16 +00:00
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
|
|
|
|
s := newStorage()
|
2019-11-08 17:37:16 +00:00
|
|
|
pendingHourEntries := &uint64set.Set{}
|
|
|
|
pendingHourEntries.Add(343)
|
|
|
|
pendingHourEntries.Add(32424)
|
|
|
|
pendingHourEntries.Add(8293432)
|
2022-11-07 12:04:06 +00:00
|
|
|
s.pendingHourEntries = pendingHourEntries
|
2019-06-09 16:06:53 +00:00
|
|
|
|
2022-11-07 12:04:06 +00:00
|
|
|
hour := fasttime.UnixHour()
|
2022-12-03 01:07:13 +00:00
|
|
|
if hour%24 == 0 {
|
|
|
|
hour++
|
|
|
|
}
|
2019-06-09 16:06:53 +00:00
|
|
|
hmOrig := &hourMetricIDs{
|
2019-09-24 18:10:22 +00:00
|
|
|
m: &uint64set.Set{},
|
2022-12-03 01:07:13 +00:00
|
|
|
hour: hour - 1,
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
2019-09-24 18:10:22 +00:00
|
|
|
hmOrig.m.Add(12)
|
|
|
|
hmOrig.m.Add(34)
|
2019-06-09 16:06:53 +00:00
|
|
|
s.currHourMetricIDs.Store(hmOrig)
|
2022-11-07 11:55:37 +00:00
|
|
|
s.updateCurrHourMetricIDs(hour)
|
2023-07-20 00:37:49 +00:00
|
|
|
hmCurr := s.currHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
if hmCurr.hour != hour {
|
2022-11-07 11:55:37 +00:00
|
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
2019-11-08 17:37:16 +00:00
|
|
|
if !hmCurr.m.Equal(pendingHourEntries) {
|
|
|
|
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
|
|
|
|
2023-07-20 00:37:49 +00:00
|
|
|
hmPrev := s.prevHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
|
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
|
|
|
}
|
|
|
|
|
2019-11-08 17:37:16 +00:00
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
|
|
|
|
s := newStorage()
|
2019-11-08 17:37:16 +00:00
|
|
|
pendingHourEntries := &uint64set.Set{}
|
|
|
|
pendingHourEntries.Add(343)
|
|
|
|
pendingHourEntries.Add(32424)
|
|
|
|
pendingHourEntries.Add(8293432)
|
2022-11-07 12:04:06 +00:00
|
|
|
s.pendingHourEntries = pendingHourEntries
|
2019-06-09 16:06:53 +00:00
|
|
|
|
2022-11-07 12:04:06 +00:00
|
|
|
hour := fasttime.UnixHour()
|
2019-06-09 16:06:53 +00:00
|
|
|
hmOrig := &hourMetricIDs{
|
2019-09-24 18:10:22 +00:00
|
|
|
m: &uint64set.Set{},
|
2019-06-09 16:06:53 +00:00
|
|
|
hour: hour,
|
|
|
|
}
|
2019-09-24 18:10:22 +00:00
|
|
|
hmOrig.m.Add(12)
|
|
|
|
hmOrig.m.Add(34)
|
2019-06-09 16:06:53 +00:00
|
|
|
s.currHourMetricIDs.Store(hmOrig)
|
2022-11-07 11:55:37 +00:00
|
|
|
s.updateCurrHourMetricIDs(hour)
|
2023-07-20 00:37:49 +00:00
|
|
|
hmCurr := s.currHourMetricIDs.Load()
|
2022-12-03 01:07:13 +00:00
|
|
|
if hmCurr.hour != hour {
|
|
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
|
|
}
|
|
|
|
m := pendingHourEntries.Clone()
|
|
|
|
hmOrig.m.ForEach(func(part []uint64) bool {
|
|
|
|
for _, metricID := range part {
|
|
|
|
m.Add(metricID)
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
})
|
|
|
|
if !hmCurr.m.Equal(m) {
|
|
|
|
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
|
|
|
|
}
|
|
|
|
|
2023-07-20 00:37:49 +00:00
|
|
|
hmPrev := s.prevHourMetricIDs.Load()
|
2022-12-03 01:07:13 +00:00
|
|
|
hmEmpty := &hourMetricIDs{}
|
|
|
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
|
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
|
|
|
}
|
|
|
|
|
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("nonempty_pending_metric_ids_valid_curr_hour_start_of_day", func(t *testing.T) {
|
|
|
|
s := newStorage()
|
|
|
|
pendingHourEntries := &uint64set.Set{}
|
|
|
|
pendingHourEntries.Add(343)
|
|
|
|
pendingHourEntries.Add(32424)
|
|
|
|
pendingHourEntries.Add(8293432)
|
|
|
|
s.pendingHourEntries = pendingHourEntries
|
|
|
|
|
|
|
|
hour := fasttime.UnixHour()
|
|
|
|
hour -= hour % 24
|
|
|
|
hmOrig := &hourMetricIDs{
|
|
|
|
m: &uint64set.Set{},
|
|
|
|
hour: hour,
|
|
|
|
}
|
|
|
|
hmOrig.m.Add(12)
|
|
|
|
hmOrig.m.Add(34)
|
|
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
|
|
s.updateCurrHourMetricIDs(hour)
|
2023-07-20 00:37:49 +00:00
|
|
|
hmCurr := s.currHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
if hmCurr.hour != hour {
|
2022-11-07 11:55:37 +00:00
|
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
2019-11-08 17:37:16 +00:00
|
|
|
m := pendingHourEntries.Clone()
|
2020-01-15 10:12:46 +00:00
|
|
|
hmOrig.m.ForEach(func(part []uint64) bool {
|
|
|
|
for _, metricID := range part {
|
|
|
|
m.Add(metricID)
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
})
|
2019-11-08 11:16:40 +00:00
|
|
|
if !hmCurr.m.Equal(m) {
|
2019-06-09 16:06:53 +00:00
|
|
|
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
|
|
|
|
}
|
|
|
|
|
2023-07-20 00:37:49 +00:00
|
|
|
hmPrev := s.prevHourMetricIDs.Load()
|
2019-06-09 16:06:53 +00:00
|
|
|
hmEmpty := &hourMetricIDs{}
|
|
|
|
if !reflect.DeepEqual(hmPrev, hmEmpty) {
|
|
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
|
|
|
|
}
|
|
|
|
|
2022-11-07 11:55:37 +00:00
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
|
|
|
}
|
|
|
|
})
|
2022-11-07 12:04:06 +00:00
|
|
|
t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) {
|
2022-11-07 11:55:37 +00:00
|
|
|
s := newStorage()
|
|
|
|
|
2022-11-07 12:04:06 +00:00
|
|
|
hour := fasttime.UnixHour()
|
|
|
|
hour -= hour % 24
|
2022-11-07 11:55:37 +00:00
|
|
|
|
2022-11-07 12:04:06 +00:00
|
|
|
pendingHourEntries := &uint64set.Set{}
|
|
|
|
pendingHourEntries.Add(343)
|
|
|
|
pendingHourEntries.Add(32424)
|
|
|
|
pendingHourEntries.Add(8293432)
|
|
|
|
s.pendingHourEntries = pendingHourEntries
|
2022-11-07 11:55:37 +00:00
|
|
|
|
|
|
|
hmOrig := &hourMetricIDs{
|
|
|
|
m: &uint64set.Set{},
|
|
|
|
hour: hour - 1,
|
|
|
|
}
|
|
|
|
s.currHourMetricIDs.Store(hmOrig)
|
|
|
|
s.updateCurrHourMetricIDs(hour)
|
2023-07-20 00:37:49 +00:00
|
|
|
hmCurr := s.currHourMetricIDs.Load()
|
2022-11-07 11:55:37 +00:00
|
|
|
if hmCurr.hour != hour {
|
|
|
|
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
|
|
|
|
}
|
2022-11-07 12:04:06 +00:00
|
|
|
if hmCurr.m.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil))
|
2022-11-07 11:55:37 +00:00
|
|
|
}
|
2023-07-20 00:37:49 +00:00
|
|
|
hmPrev := s.prevHourMetricIDs.Load()
|
2022-11-07 12:04:06 +00:00
|
|
|
if !reflect.DeepEqual(hmPrev, hmOrig) {
|
|
|
|
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
|
2022-11-07 11:55:37 +00:00
|
|
|
}
|
2019-11-08 17:37:16 +00:00
|
|
|
if s.pendingHourEntries.Len() != 0 {
|
|
|
|
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
|
2019-06-09 16:06:53 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2019-05-22 21:16:55 +00:00
|
|
|
func TestMetricRowMarshalUnmarshal(t *testing.T) {
|
|
|
|
var buf []byte
|
|
|
|
typ := reflect.TypeOf(&MetricRow{})
|
2023-01-24 04:10:29 +00:00
|
|
|
rng := rand.New(rand.NewSource(1))
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
for i := 0; i < 1000; i++ {
|
2023-01-24 04:10:29 +00:00
|
|
|
v, ok := quick.Value(typ, rng)
|
2019-05-22 21:16:55 +00:00
|
|
|
if !ok {
|
|
|
|
t.Fatalf("cannot create random MetricRow via quick.Value")
|
|
|
|
}
|
|
|
|
mr1 := v.Interface().(*MetricRow)
|
|
|
|
if mr1 == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
buf = mr1.Marshal(buf[:0])
|
|
|
|
var mr2 MetricRow
|
2021-05-08 14:55:44 +00:00
|
|
|
tail, err := mr2.UnmarshalX(buf)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err)
|
|
|
|
}
|
|
|
|
if len(tail) > 0 {
|
|
|
|
t.Fatalf("non-empty tail returned after MetricRow.Unmarshal for mr1=%s", mr1)
|
|
|
|
}
|
|
|
|
if mr1.MetricNameRaw == nil {
|
|
|
|
mr1.MetricNameRaw = []byte{}
|
|
|
|
}
|
|
|
|
if mr2.MetricNameRaw == nil {
|
|
|
|
mr2.MetricNameRaw = []byte{}
|
|
|
|
}
|
|
|
|
if !reflect.DeepEqual(mr1, &mr2) {
|
|
|
|
t.Fatalf("mr1 should match mr2; got\nmr1=%s\nmr2=%s", mr1, &mr2)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-22 22:20:21 +00:00
|
|
|
func TestNextRetentionDeadlineSeconds(t *testing.T) {
|
|
|
|
f := func(currentTime string, retention, offset time.Duration, deadlineExpected string) {
|
2023-05-04 15:16:48 +00:00
|
|
|
t.Helper()
|
|
|
|
|
2023-07-22 22:20:21 +00:00
|
|
|
now, err := time.Parse(time.RFC3339, currentTime)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot parse currentTime=%q: %s", currentTime, err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2023-05-16 15:14:08 +00:00
|
|
|
|
2023-07-22 22:20:21 +00:00
|
|
|
d := nextRetentionDeadlineSeconds(now.Unix(), int64(retention.Seconds()), int64(offset.Seconds()))
|
|
|
|
deadline := time.Unix(d, 0).UTC().Format(time.RFC3339)
|
|
|
|
if deadline != deadlineExpected {
|
|
|
|
t.Fatalf("unexpected deadline; got %s; want %s", deadline, deadlineExpected)
|
|
|
|
}
|
2023-05-16 15:14:08 +00:00
|
|
|
}
|
|
|
|
|
2023-07-22 22:20:21 +00:00
|
|
|
f("2023-07-22T12:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
|
|
|
f("2023-07-22T03:44:35Z", 24*time.Hour, 0, "2023-07-22T04:00:00Z")
|
|
|
|
f("2023-07-22T04:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
|
|
|
f("2023-07-22T23:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
|
|
|
f("2023-07-23T03:59:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
|
|
|
|
|
|
|
f("2023-07-22T12:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
|
|
|
f("2023-07-22T01:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-22T02:00:00Z")
|
|
|
|
f("2023-07-22T02:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
|
|
|
f("2023-07-22T23:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
|
|
|
f("2023-07-23T01:59:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
|
|
|
|
|
|
|
f("2023-07-22T12:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
|
|
|
|
f("2023-07-22T08:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-22T09:00:00Z")
|
|
|
|
f("2023-07-22T09:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
|
|
|
|
|
|
|
|
f("2023-07-22T12:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
|
|
|
|
f("2023-07-22T15:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
|
|
|
|
f("2023-07-22T16:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-23T16:00:00Z")
|
|
|
|
|
|
|
|
f("2023-07-22T12:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
|
|
|
|
f("2023-07-22T21:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
|
|
|
|
f("2023-07-22T22:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-23T22:00:00Z")
|
|
|
|
|
|
|
|
f("2023-07-22T12:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
|
|
|
|
f("2023-07-22T09:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-22T10:00:00Z")
|
|
|
|
f("2023-07-22T10:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
|
|
|
|
|
|
|
|
f("2023-07-22T12:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
|
|
|
|
f("2023-07-22T14:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
|
|
|
|
f("2023-07-22T15:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-23T15:00:00Z")
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func TestStorageOpenClose(t *testing.T) {
|
|
|
|
path := "TestStorageOpenClose"
|
|
|
|
for i := 0; i < 10; i++ {
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, -1, 1e5, 1e6)
|
2019-05-22 21:16:55 +00:00
|
|
|
s.MustClose()
|
|
|
|
}
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestStorageRandTimestamps(t *testing.T) {
|
|
|
|
path := "TestStorageRandTimestamps"
|
2022-12-05 23:15:00 +00:00
|
|
|
retentionMsecs := int64(10 * msecsPerMonth)
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, retentionMsecs, 0, 0)
|
2019-05-22 21:16:55 +00:00
|
|
|
t.Run("serial", func(t *testing.T) {
|
|
|
|
for i := 0; i < 3; i++ {
|
|
|
|
if err := testStorageRandTimestamps(s); err != nil {
|
2022-12-05 23:15:00 +00:00
|
|
|
t.Fatalf("error on iteration %d: %s", i, err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
s.MustClose()
|
2023-04-15 06:01:20 +00:00
|
|
|
s = MustOpenStorage(path, retentionMsecs, 0, 0)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
t.Run("concurrent", func(t *testing.T) {
|
|
|
|
ch := make(chan error, 3)
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
go func() {
|
|
|
|
var err error
|
|
|
|
for i := 0; i < 2; i++ {
|
|
|
|
err = testStorageRandTimestamps(s)
|
|
|
|
}
|
|
|
|
ch <- err
|
|
|
|
}()
|
|
|
|
}
|
2022-12-05 23:15:00 +00:00
|
|
|
tt := time.NewTimer(time.Second * 10)
|
2019-05-22 21:16:55 +00:00
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
select {
|
|
|
|
case err := <-ch:
|
|
|
|
if err != nil {
|
2022-12-05 23:15:00 +00:00
|
|
|
t.Fatalf("error on iteration %d: %s", i, err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2022-12-05 23:15:00 +00:00
|
|
|
case <-tt.C:
|
|
|
|
t.Fatalf("timeout on iteration %d", i)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func testStorageRandTimestamps(s *Storage) error {
|
2022-12-05 23:15:00 +00:00
|
|
|
currentTime := timestampFromTime(time.Now())
|
|
|
|
const rowsPerAdd = 5e3
|
|
|
|
const addsCount = 3
|
2023-01-24 04:10:29 +00:00
|
|
|
rng := rand.New(rand.NewSource(1))
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
for i := 0; i < addsCount; i++ {
|
|
|
|
var mrs []MetricRow
|
|
|
|
var mn MetricName
|
|
|
|
mn.Tags = []Tag{
|
|
|
|
{[]byte("job"), []byte("webservice")},
|
|
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
|
|
}
|
|
|
|
for j := 0; j < rowsPerAdd; j++ {
|
2023-01-24 04:10:29 +00:00
|
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rng.Intn(100)))
|
2019-05-22 21:16:55 +00:00
|
|
|
metricNameRaw := mn.marshalRaw(nil)
|
2023-01-24 04:10:29 +00:00
|
|
|
timestamp := currentTime - int64((rng.Float64()-0.2)*float64(2*s.retentionMsecs))
|
|
|
|
value := rng.NormFloat64() * 1e11
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
mr := MetricRow{
|
|
|
|
MetricNameRaw: metricNameRaw,
|
|
|
|
Timestamp: timestamp,
|
|
|
|
Value: value,
|
|
|
|
}
|
|
|
|
mrs = append(mrs, mr)
|
|
|
|
}
|
|
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
2019-08-25 12:28:32 +00:00
|
|
|
errStr := err.Error()
|
|
|
|
if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify the storage contains rows.
|
|
|
|
var m Metrics
|
|
|
|
s.UpdateMetrics(&m)
|
2022-12-05 23:15:00 +00:00
|
|
|
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount == 0 {
|
|
|
|
return fmt.Errorf("expecting at least one row in storage")
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-07-05 20:56:31 +00:00
|
|
|
func TestStorageDeleteSeries(t *testing.T) {
|
|
|
|
path := "TestStorageDeleteSeries"
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, 0, 0, 0)
|
2019-05-22 21:16:55 +00:00
|
|
|
|
2022-06-12 01:32:13 +00:00
|
|
|
// Verify no label names exist
|
|
|
|
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
if len(lns) != 0 {
|
|
|
|
t.Fatalf("found non-empty tag keys at the start: %q", lns)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
t.Run("serial", func(t *testing.T) {
|
|
|
|
for i := 0; i < 3; i++ {
|
2022-07-05 20:56:31 +00:00
|
|
|
if err = testStorageDeleteSeries(s, 0); err != nil {
|
2019-11-09 21:17:42 +00:00
|
|
|
t.Fatalf("unexpected error on iteration %d: %s", i, err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Re-open the storage in order to check how deleted metricIDs
|
|
|
|
// are persisted.
|
|
|
|
s.MustClose()
|
2023-04-15 06:01:20 +00:00
|
|
|
s = MustOpenStorage(path, 0, 0, 0)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
t.Run("concurrent", func(t *testing.T) {
|
|
|
|
ch := make(chan error, 3)
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
go func(workerNum int) {
|
|
|
|
var err error
|
|
|
|
for j := 0; j < 2; j++ {
|
2022-07-05 20:56:31 +00:00
|
|
|
err = testStorageDeleteSeries(s, workerNum)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ch <- err
|
|
|
|
}(i)
|
|
|
|
}
|
2022-12-05 23:15:00 +00:00
|
|
|
tt := time.NewTimer(30 * time.Second)
|
2019-05-22 21:16:55 +00:00
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
select {
|
|
|
|
case err := <-ch:
|
|
|
|
if err != nil {
|
2022-12-05 23:15:00 +00:00
|
|
|
t.Fatalf("unexpected error on iteration %d: %s", i, err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2022-12-05 23:15:00 +00:00
|
|
|
case <-tt.C:
|
|
|
|
t.Fatalf("timeout on iteration %d", i)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
// Verify no more tag keys exist
|
2022-06-12 01:32:13 +00:00
|
|
|
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange after the test: %s", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
if len(lns) != 0 {
|
|
|
|
t.Fatalf("found non-empty tag keys after the test: %q", lns)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-05 20:56:31 +00:00
|
|
|
func testStorageDeleteSeries(s *Storage, workerNum int) error {
|
2023-01-24 04:10:29 +00:00
|
|
|
rng := rand.New(rand.NewSource(1))
|
2019-05-22 21:16:55 +00:00
|
|
|
const rowsPerMetric = 100
|
|
|
|
const metricsCount = 30
|
|
|
|
|
|
|
|
workerTag := []byte(fmt.Sprintf("workerTag_%d", workerNum))
|
|
|
|
|
2022-06-12 01:32:13 +00:00
|
|
|
lnsAll := make(map[string]bool)
|
|
|
|
lnsAll["__name__"] = true
|
2019-05-22 21:16:55 +00:00
|
|
|
for i := 0; i < metricsCount; i++ {
|
|
|
|
var mrs []MetricRow
|
|
|
|
var mn MetricName
|
|
|
|
job := fmt.Sprintf("job_%d_%d", i, workerNum)
|
|
|
|
instance := fmt.Sprintf("instance_%d_%d", i, workerNum)
|
|
|
|
mn.Tags = []Tag{
|
|
|
|
{[]byte("job"), []byte(job)},
|
|
|
|
{[]byte("instance"), []byte(instance)},
|
|
|
|
{workerTag, []byte("foobar")},
|
|
|
|
}
|
|
|
|
for i := range mn.Tags {
|
2022-06-12 01:32:13 +00:00
|
|
|
lnsAll[string(mn.Tags[i].Key)] = true
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", i, workerNum))
|
|
|
|
metricNameRaw := mn.marshalRaw(nil)
|
|
|
|
|
|
|
|
for j := 0; j < rowsPerMetric; j++ {
|
2023-01-24 04:10:29 +00:00
|
|
|
timestamp := rng.Int63n(1e10)
|
|
|
|
value := rng.NormFloat64() * 1e6
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
mr := MetricRow{
|
|
|
|
MetricNameRaw: metricNameRaw,
|
|
|
|
Timestamp: timestamp,
|
|
|
|
Value: value,
|
|
|
|
}
|
|
|
|
mrs = append(mrs, mr)
|
|
|
|
}
|
|
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
2020-11-11 12:40:27 +00:00
|
|
|
s.DebugFlush()
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
// Verify tag values exist
|
2022-06-12 01:32:13 +00:00
|
|
|
tvs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange before metrics removal: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if len(tvs) == 0 {
|
|
|
|
return fmt.Errorf("unexpected empty number of tag values for workerTag")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify tag keys exist
|
2022-06-12 01:32:13 +00:00
|
|
|
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange before metrics removal: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
if err := checkLabelNames(lns, lnsAll); err != nil {
|
|
|
|
return fmt.Errorf("unexpected label names before metrics removal: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var sr Search
|
|
|
|
tr := TimeRange{
|
|
|
|
MinTimestamp: 0,
|
|
|
|
MaxTimestamp: 2e10,
|
|
|
|
}
|
|
|
|
metricBlocksCount := func(tfs *TagFilters) int {
|
2020-04-27 05:13:41 +00:00
|
|
|
// Verify the number of blocks
|
2019-05-22 21:16:55 +00:00
|
|
|
n := 0
|
2022-05-31 23:29:19 +00:00
|
|
|
sr.Init(nil, s, []*TagFilters{tfs}, tr, 1e5, noDeadline)
|
2019-05-22 21:16:55 +00:00
|
|
|
for sr.NextMetricBlock() {
|
|
|
|
n++
|
|
|
|
}
|
|
|
|
sr.MustClose()
|
|
|
|
return n
|
|
|
|
}
|
|
|
|
for i := 0; i < metricsCount; i++ {
|
|
|
|
tfs := NewTagFilters()
|
|
|
|
if err := tfs.Add(nil, []byte("metric_.+"), false, true); err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot add regexp tag filter: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
job := fmt.Sprintf("job_%d_%d", i, workerNum)
|
|
|
|
if err := tfs.Add([]byte("job"), []byte(job), false, false); err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot add job tag filter: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if n := metricBlocksCount(tfs); n == 0 {
|
|
|
|
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
|
|
|
|
}
|
2022-07-05 20:56:31 +00:00
|
|
|
deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs})
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot delete metrics: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if deletedCount == 0 {
|
2019-11-09 16:48:58 +00:00
|
|
|
return fmt.Errorf("expecting non-zero number of deleted metrics on iteration %d", i)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if n := metricBlocksCount(tfs); n != 0 {
|
2022-07-05 20:56:31 +00:00
|
|
|
return fmt.Errorf("expecting zero metric blocks after DeleteSeries call for tfs=%s; got %d blocks", tfs, n)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Try deleting empty tfss
|
2022-07-05 20:56:31 +00:00
|
|
|
deletedCount, err = s.DeleteSeries(nil, nil)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot delete empty tfss: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if deletedCount != 0 {
|
|
|
|
return fmt.Errorf("expecting zero deleted metrics for empty tfss; got %d", deletedCount)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make sure no more metrics left for the given workerNum
|
|
|
|
tfs := NewTagFilters()
|
|
|
|
if err := tfs.Add(nil, []byte(fmt.Sprintf("metric_.+_%d", workerNum)), false, true); err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot add regexp tag filter for worker metrics: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if n := metricBlocksCount(tfs); n != 0 {
|
|
|
|
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
|
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
tvs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange after all the metrics are removed: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if len(tvs) != 0 {
|
|
|
|
return fmt.Errorf("found non-empty tag values for %q after metrics removal: %q", workerTag, tvs)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-06-12 01:32:13 +00:00
|
|
|
func checkLabelNames(lns []string, lnsExpected map[string]bool) error {
|
|
|
|
if len(lns) < len(lnsExpected) {
|
|
|
|
return fmt.Errorf("unexpected number of label names found; got %d; want at least %d; lns=%q, lnsExpected=%v", len(lns), len(lnsExpected), lns, lnsExpected)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
hasItem := func(s string, lns []string) bool {
|
|
|
|
for _, labelName := range lns {
|
|
|
|
if s == labelName {
|
2019-05-22 21:16:55 +00:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
for labelName := range lnsExpected {
|
|
|
|
if !hasItem(labelName, lns) {
|
|
|
|
return fmt.Errorf("cannot find %q in label names %q", labelName, lns)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-11-15 22:42:27 +00:00
|
|
|
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
|
|
|
|
path := "TestStorageRegisterMetricNamesSerial"
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, 0, 0, 0)
|
2020-11-15 22:42:27 +00:00
|
|
|
if err := testStorageRegisterMetricNames(s); err != nil {
|
|
|
|
t.Fatalf("unexpected error: %s", err)
|
|
|
|
}
|
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
|
|
|
|
path := "TestStorageRegisterMetricNamesConcurrent"
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, 0, 0, 0)
|
2020-11-15 22:42:27 +00:00
|
|
|
ch := make(chan error, 3)
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
go func() {
|
|
|
|
ch <- testStorageRegisterMetricNames(s)
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
select {
|
|
|
|
case err := <-ch:
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("unexpected error: %s", err)
|
|
|
|
}
|
|
|
|
case <-time.After(10 * time.Second):
|
|
|
|
t.Fatalf("timeout")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func testStorageRegisterMetricNames(s *Storage) error {
|
|
|
|
const metricsPerAdd = 1e3
|
|
|
|
const addsCount = 10
|
|
|
|
|
|
|
|
addIDsMap := make(map[string]struct{})
|
|
|
|
for i := 0; i < addsCount; i++ {
|
|
|
|
var mrs []MetricRow
|
|
|
|
var mn MetricName
|
|
|
|
addID := fmt.Sprintf("%d", i)
|
|
|
|
addIDsMap[addID] = struct{}{}
|
|
|
|
mn.Tags = []Tag{
|
|
|
|
{[]byte("job"), []byte("webservice")},
|
|
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
|
|
{[]byte("add_id"), []byte(addID)},
|
|
|
|
}
|
|
|
|
now := timestampFromTime(time.Now())
|
|
|
|
for j := 0; j < metricsPerAdd; j++ {
|
2020-11-16 11:15:16 +00:00
|
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", j))
|
2020-11-15 22:42:27 +00:00
|
|
|
metricNameRaw := mn.marshalRaw(nil)
|
|
|
|
|
|
|
|
mr := MetricRow{
|
|
|
|
MetricNameRaw: metricNameRaw,
|
|
|
|
Timestamp: now,
|
|
|
|
}
|
|
|
|
mrs = append(mrs, mr)
|
|
|
|
}
|
lib/storage: switch from global to per-day index for `MetricName -> TSID` mapping
Previously all the newly ingested time series were registered in global `MetricName -> TSID` index.
This index was used during data ingestion for locating the TSID (internal series id)
for the given canonical metric name (the canonical metric name consists of metric name plus all its labels sorted by label names).
The `MetricName -> TSID` index is stored on disk in order to make sure that the data
isn't lost on VictoriaMetrics restart or unclean shutdown.
The lookup in this index is relatively slow, since VictoriaMetrics needs to read the corresponding
data block from disk, unpack it, put the unpacked block into `indexdb/dataBlocks` cache,
and then search for the given `MetricName -> TSID` entry there. So VictoriaMetrics
uses in-memory cache for speeding up the lookup for active time series.
This cache is named `storage/tsid`. If this cache capacity is enough for all the currently ingested
active time series, then VictoriaMetrics works fast, since it doesn't need to read the data from disk.
VictoriaMetrics starts reading data from `MetricName -> TSID` on-disk index in the following cases:
- If `storage/tsid` cache capacity isn't enough for active time series.
Then just increase available memory for VictoriaMetrics or reduce the number of active time series
ingested into VictoriaMetrics.
- If new time series is ingested into VictoriaMetrics. In this case it cannot find
the needed entry in the `storage/tsid` cache, so it needs to consult on-disk `MetricName -> TSID` index,
since it doesn't know that the index has no the corresponding entry too.
This is a typical event under high churn rate, when old time series are constantly substituted
with new time series.
Reading the data from `MetricName -> TSID` index is slow, so inserts, which lead to reading this index,
are counted as slow inserts, and they can be monitored via `vm_slow_row_inserts_total` metric exposed by VictoriaMetrics.
Prior to this commit the `MetricName -> TSID` index was global, e.g. it contained entries sorted by `MetricName`
for all the time series ever ingested into VictoriaMetrics during the configured -retentionPeriod.
This index can become very large under high churn rate and long retention. VictoriaMetrics
caches data from this index in `indexdb/dataBlocks` in-memory cache for speeding up index lookups.
The `indexdb/dataBlocks` cache may occupy significant share of available memory for storing
recently accessed blocks at `MetricName -> TSID` index when searching for newly ingested time series.
This commit switches from global `MetricName -> TSID` index to per-day index. This allows significantly
reducing the amounts of data, which needs to be cached in `indexdb/dataBlocks`, since now VictoriaMetrics
consults only the index for the current day when new time series is ingested into it.
The downside of this change is increased indexdb size on disk for workloads without high churn rate,
e.g. with static time series, which do no change over time, since now VictoriaMetrics needs to store
identical `MetricName -> TSID` entries for static time series for every day.
This change removes an optimization for reducing CPU and disk IO spikes at indexdb rotation,
since it didn't work correctly - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 .
At the same time the change fixes the issue, which could result in lost access to time series,
which stop receving new samples during the first hour after indexdb rotation - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
The issue with the increased CPU and disk IO usage during indexdb rotation will be addressed
in a separate commit according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401#issuecomment-1553488685
This is a follow-up for 1f28b46ae9350795af41cbfc3ca0e8a5af084fce
2023-07-13 22:33:41 +00:00
|
|
|
s.RegisterMetricNames(nil, mrs)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
|
|
|
var addIDsExpected []string
|
|
|
|
for k := range addIDsMap {
|
|
|
|
addIDsExpected = append(addIDsExpected, k)
|
|
|
|
}
|
|
|
|
sort.Strings(addIDsExpected)
|
|
|
|
|
|
|
|
// Verify the storage contains the added metric names.
|
|
|
|
s.DebugFlush()
|
|
|
|
|
2022-06-12 01:32:13 +00:00
|
|
|
// Verify that SearchLabelNamesWithFiltersOnTimeRange returns correct result.
|
|
|
|
lnsExpected := []string{
|
|
|
|
"__name__",
|
2020-11-15 22:42:27 +00:00
|
|
|
"add_id",
|
|
|
|
"instance",
|
|
|
|
"job",
|
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 100, 1e9, noDeadline)
|
2020-11-15 22:42:27 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
sort.Strings(lns)
|
|
|
|
if !reflect.DeepEqual(lns, lnsExpected) {
|
|
|
|
return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
|
|
|
|
2022-06-12 11:17:44 +00:00
|
|
|
// Verify that SearchLabelNamesWithFiltersOnTimeRange with the specified time range returns correct result.
|
2020-11-15 22:42:27 +00:00
|
|
|
now := timestampFromTime(time.Now())
|
|
|
|
start := now - msecPerDay
|
|
|
|
end := now + 60*1000
|
|
|
|
tr := TimeRange{
|
|
|
|
MinTimestamp: start,
|
|
|
|
MaxTimestamp: end,
|
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, tr, 100, 1e9, noDeadline)
|
2020-11-15 22:42:27 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
2022-06-12 01:32:13 +00:00
|
|
|
sort.Strings(lns)
|
|
|
|
if !reflect.DeepEqual(lns, lnsExpected) {
|
|
|
|
return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
|
|
|
|
2022-06-12 01:32:13 +00:00
|
|
|
// Verify that SearchLabelValuesWithFiltersOnTimeRange returns correct result.
|
|
|
|
addIDs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline)
|
2020-11-15 22:42:27 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
|
|
|
sort.Strings(addIDs)
|
|
|
|
if !reflect.DeepEqual(addIDs, addIDsExpected) {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
|
|
|
|
2022-06-12 01:32:13 +00:00
|
|
|
// Verify that SearchLabelValuesWithFiltersOnTimeRange with the specified time range returns correct result.
|
|
|
|
addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, tr, addsCount+100, 1e9, noDeadline)
|
2020-11-15 22:42:27 +00:00
|
|
|
if err != nil {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
|
|
|
sort.Strings(addIDs)
|
|
|
|
if !reflect.DeepEqual(addIDs, addIDsExpected) {
|
2022-06-12 01:32:13 +00:00
|
|
|
return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
|
2020-11-15 22:42:27 +00:00
|
|
|
}
|
|
|
|
|
2020-11-16 11:15:16 +00:00
|
|
|
// Verify that SearchMetricNames returns correct result.
|
|
|
|
tfs := NewTagFilters()
|
|
|
|
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
|
|
|
|
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
|
|
|
|
}
|
2022-06-28 14:36:27 +00:00
|
|
|
metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
|
2020-11-16 11:15:16 +00:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error in SearchMetricNames: %w", err)
|
|
|
|
}
|
2022-06-28 14:36:27 +00:00
|
|
|
if len(metricNames) < metricsPerAdd {
|
|
|
|
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(metricNames), int(metricsPerAdd))
|
2020-11-16 11:15:16 +00:00
|
|
|
}
|
2022-06-28 14:36:27 +00:00
|
|
|
var mn MetricName
|
|
|
|
for i, metricName := range metricNames {
|
|
|
|
if err := mn.UnmarshalString(metricName); err != nil {
|
|
|
|
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
|
|
|
|
}
|
2020-11-16 11:15:16 +00:00
|
|
|
addID := mn.GetTagValue("add_id")
|
|
|
|
if string(addID) != "0" {
|
|
|
|
return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0")
|
|
|
|
}
|
|
|
|
job := mn.GetTagValue("job")
|
|
|
|
if string(job) != "webservice" {
|
|
|
|
return fmt.Errorf("unexpected job for metricName #%d; got %q; want %q", i, job, "webservice")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-15 22:42:27 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-03-24 20:24:54 +00:00
|
|
|
func TestStorageAddRowsSerial(t *testing.T) {
|
2023-01-24 04:10:29 +00:00
|
|
|
rng := rand.New(rand.NewSource(1))
|
2020-03-24 20:24:54 +00:00
|
|
|
path := "TestStorageAddRowsSerial"
|
2022-12-05 23:15:00 +00:00
|
|
|
retentionMsecs := int64(msecsPerMonth * 10)
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5)
|
2023-01-24 04:10:29 +00:00
|
|
|
if err := testStorageAddRows(rng, s); err != nil {
|
2020-03-24 20:24:54 +00:00
|
|
|
t.Fatalf("unexpected error: %s", err)
|
|
|
|
}
|
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestStorageAddRowsConcurrent(t *testing.T) {
|
|
|
|
path := "TestStorageAddRowsConcurrent"
|
2022-12-05 23:15:00 +00:00
|
|
|
retentionMsecs := int64(msecsPerMonth * 10)
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5)
|
2020-03-24 20:24:54 +00:00
|
|
|
ch := make(chan error, 3)
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
2023-01-24 04:10:29 +00:00
|
|
|
go func(n int) {
|
|
|
|
rLocal := rand.New(rand.NewSource(int64(n)))
|
|
|
|
ch <- testStorageAddRows(rLocal, s)
|
|
|
|
}(i)
|
2020-03-24 20:24:54 +00:00
|
|
|
}
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
select {
|
|
|
|
case err := <-ch:
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("unexpected error: %s", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2020-03-24 20:24:54 +00:00
|
|
|
case <-time.After(10 * time.Second):
|
|
|
|
t.Fatalf("timeout")
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
2020-03-24 20:24:54 +00:00
|
|
|
}
|
2019-05-22 21:16:55 +00:00
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-24 04:10:29 +00:00
|
|
|
func testGenerateMetricRows(rng *rand.Rand, rows uint64, timestampMin, timestampMax int64) []MetricRow {
|
lib/index: reduce read/write load after indexDB rotation (#2177)
* lib/index: reduce read/write load after indexDB rotation
IndexDB in VM is responsible for storing TSID - ID's used for identifying
time series. The index is stored on disk and used by both ingestion and read path.
IndexDB is stored separately to data parts and is global for all stored data.
It can't be deleted partially as VM deletes data parts. Instead, indexDB is
rotated once in `retention` interval.
The rotation procedure means that `current` indexDB becomes `previous`,
and new freshly created indexDB struct becomes `current`. So in any time,
VM holds indexDB for current and previous retention periods.
When time series is ingested or queried, VM checks if its TSID is present
in `current` indexDB. If it is missing, it checks the `previous` indexDB.
If TSID was found, it gets copied to the `current` indexDB. In this way
`current` indexDB stores only series which were active during the retention
period.
To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both
write and read path consult `tsidCache` and on miss the relad lookup happens.
When rotation happens, VM resets the `tsidCache`. This is needed for ingestion
path to trigger `current` indexDB re-population. Since index re-population
requires additional resources, every index rotation event may cause some extra
load on CPU and disk. While it may be unnoticeable for most of the cases,
for systems with very high number of unique series each rotation may lead
to performance degradation for some period of time.
This PR makes an attempt to smooth out resource usage after the rotation.
The changes are following:
1. `tsidCache` is no longer reset after the rotation;
2. Instead, each entry in `tsidCache` gains a notion of indexDB to which
they belong;
3. On ingestion path after the rotation we check if requested TSID was
found in `tsidCache`. Then we have 3 branches:
3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID.
3.2 Slow path. It wasn't found, so we generate it from scratch,
add to `current` indexDB, add it to `tsidCache`.
3.3 Smooth path. It was found but does not belong to the `current` indexDB.
In this case, we add it to the `current` indexDB with some probability.
The probability is based on time passed since the last rotation with some threshold.
The more time has passed since rotation the higher is chance to re-populate `current` indexDB.
The default re-population interval in this PR is set to `1h`, during which entries from
`previous` index supposed to slowly re-populate `current` index.
The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs
were moved from `previous` indexDB to the `current` indexDB. This metric supposed to
grow only during the first `1h` after the last rotation.
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
Signed-off-by: hagen1778 <roman@victoriametrics.com>
* wip
* wip
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-11 22:30:08 +00:00
|
|
|
var mrs []MetricRow
|
|
|
|
var mn MetricName
|
|
|
|
mn.Tags = []Tag{
|
|
|
|
{[]byte("job"), []byte("webservice")},
|
|
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
|
|
}
|
|
|
|
for i := 0; i < int(rows); i++ {
|
|
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i))
|
|
|
|
metricNameRaw := mn.marshalRaw(nil)
|
2023-01-24 04:10:29 +00:00
|
|
|
timestamp := rng.Int63n(timestampMax-timestampMin) + timestampMin
|
|
|
|
value := rng.NormFloat64() * 1e6
|
lib/index: reduce read/write load after indexDB rotation (#2177)
* lib/index: reduce read/write load after indexDB rotation
IndexDB in VM is responsible for storing TSID - ID's used for identifying
time series. The index is stored on disk and used by both ingestion and read path.
IndexDB is stored separately to data parts and is global for all stored data.
It can't be deleted partially as VM deletes data parts. Instead, indexDB is
rotated once in `retention` interval.
The rotation procedure means that `current` indexDB becomes `previous`,
and new freshly created indexDB struct becomes `current`. So in any time,
VM holds indexDB for current and previous retention periods.
When time series is ingested or queried, VM checks if its TSID is present
in `current` indexDB. If it is missing, it checks the `previous` indexDB.
If TSID was found, it gets copied to the `current` indexDB. In this way
`current` indexDB stores only series which were active during the retention
period.
To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both
write and read path consult `tsidCache` and on miss the relad lookup happens.
When rotation happens, VM resets the `tsidCache`. This is needed for ingestion
path to trigger `current` indexDB re-population. Since index re-population
requires additional resources, every index rotation event may cause some extra
load on CPU and disk. While it may be unnoticeable for most of the cases,
for systems with very high number of unique series each rotation may lead
to performance degradation for some period of time.
This PR makes an attempt to smooth out resource usage after the rotation.
The changes are following:
1. `tsidCache` is no longer reset after the rotation;
2. Instead, each entry in `tsidCache` gains a notion of indexDB to which
they belong;
3. On ingestion path after the rotation we check if requested TSID was
found in `tsidCache`. Then we have 3 branches:
3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID.
3.2 Slow path. It wasn't found, so we generate it from scratch,
add to `current` indexDB, add it to `tsidCache`.
3.3 Smooth path. It was found but does not belong to the `current` indexDB.
In this case, we add it to the `current` indexDB with some probability.
The probability is based on time passed since the last rotation with some threshold.
The more time has passed since rotation the higher is chance to re-populate `current` indexDB.
The default re-population interval in this PR is set to `1h`, during which entries from
`previous` index supposed to slowly re-populate `current` index.
The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs
were moved from `previous` indexDB to the `current` indexDB. This metric supposed to
grow only during the first `1h` after the last rotation.
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
Signed-off-by: hagen1778 <roman@victoriametrics.com>
* wip
* wip
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-11 22:30:08 +00:00
|
|
|
|
|
|
|
mr := MetricRow{
|
|
|
|
MetricNameRaw: metricNameRaw,
|
|
|
|
Timestamp: timestamp,
|
|
|
|
Value: value,
|
|
|
|
}
|
|
|
|
mrs = append(mrs, mr)
|
|
|
|
}
|
|
|
|
return mrs
|
|
|
|
}
|
|
|
|
|
2023-01-24 04:10:29 +00:00
|
|
|
func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
2019-05-22 21:16:55 +00:00
|
|
|
const rowsPerAdd = 1e3
|
|
|
|
const addsCount = 10
|
|
|
|
|
2022-12-05 23:15:00 +00:00
|
|
|
maxTimestamp := timestampFromTime(time.Now())
|
2023-07-13 22:01:56 +00:00
|
|
|
minTimestamp := maxTimestamp - s.retentionMsecs + 3600*1000
|
2019-05-22 21:16:55 +00:00
|
|
|
for i := 0; i < addsCount; i++ {
|
2023-01-24 04:10:29 +00:00
|
|
|
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify the storage contains rows.
|
lib/index: reduce read/write load after indexDB rotation (#2177)
* lib/index: reduce read/write load after indexDB rotation
IndexDB in VM is responsible for storing TSID - ID's used for identifying
time series. The index is stored on disk and used by both ingestion and read path.
IndexDB is stored separately to data parts and is global for all stored data.
It can't be deleted partially as VM deletes data parts. Instead, indexDB is
rotated once in `retention` interval.
The rotation procedure means that `current` indexDB becomes `previous`,
and new freshly created indexDB struct becomes `current`. So in any time,
VM holds indexDB for current and previous retention periods.
When time series is ingested or queried, VM checks if its TSID is present
in `current` indexDB. If it is missing, it checks the `previous` indexDB.
If TSID was found, it gets copied to the `current` indexDB. In this way
`current` indexDB stores only series which were active during the retention
period.
To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both
write and read path consult `tsidCache` and on miss the relad lookup happens.
When rotation happens, VM resets the `tsidCache`. This is needed for ingestion
path to trigger `current` indexDB re-population. Since index re-population
requires additional resources, every index rotation event may cause some extra
load on CPU and disk. While it may be unnoticeable for most of the cases,
for systems with very high number of unique series each rotation may lead
to performance degradation for some period of time.
This PR makes an attempt to smooth out resource usage after the rotation.
The changes are following:
1. `tsidCache` is no longer reset after the rotation;
2. Instead, each entry in `tsidCache` gains a notion of indexDB to which
they belong;
3. On ingestion path after the rotation we check if requested TSID was
found in `tsidCache`. Then we have 3 branches:
3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID.
3.2 Slow path. It wasn't found, so we generate it from scratch,
add to `current` indexDB, add it to `tsidCache`.
3.3 Smooth path. It was found but does not belong to the `current` indexDB.
In this case, we add it to the `current` indexDB with some probability.
The probability is based on time passed since the last rotation with some threshold.
The more time has passed since rotation the higher is chance to re-populate `current` indexDB.
The default re-population interval in this PR is set to `1h`, during which entries from
`previous` index supposed to slowly re-populate `current` index.
The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs
were moved from `previous` indexDB to the `current` indexDB. This metric supposed to
grow only during the first `1h` after the last rotation.
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
Signed-off-by: hagen1778 <roman@victoriametrics.com>
* wip
* wip
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-11 22:30:08 +00:00
|
|
|
minRowsExpected := uint64(rowsPerAdd * addsCount)
|
2019-05-22 21:16:55 +00:00
|
|
|
var m Metrics
|
|
|
|
s.UpdateMetrics(&m)
|
2022-12-05 23:15:00 +00:00
|
|
|
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
|
|
|
|
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Try creating a snapshot from the storage.
|
2023-02-27 20:12:03 +00:00
|
|
|
snapshotName, err := s.CreateSnapshot(0)
|
2019-05-22 21:16:55 +00:00
|
|
|
if err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot create snapshot from the storage: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Verify the snapshot is visible
|
|
|
|
snapshots, err := s.ListSnapshots()
|
|
|
|
if err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot list snapshots: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if !containsString(snapshots, snapshotName) {
|
|
|
|
return fmt.Errorf("cannot find snapshot %q in %q", snapshotName, snapshots)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try opening the storage from snapshot.
|
2023-03-25 21:33:54 +00:00
|
|
|
snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName)
|
2023-04-15 06:01:20 +00:00
|
|
|
s1 := MustOpenStorage(snapshotPath, 0, 0, 0)
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
// Verify the snapshot contains rows
|
|
|
|
var m1 Metrics
|
|
|
|
s1.UpdateMetrics(&m1)
|
2022-12-05 23:15:00 +00:00
|
|
|
if rowsCount := m1.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
|
|
|
|
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, rowsCount)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
|
2023-07-13 22:01:56 +00:00
|
|
|
// Verify that force merge for the snapshot leaves at most a single part per partition.
|
|
|
|
// Zero parts are possible if the snapshot is created just after the partition has been created
|
|
|
|
// by concurrent goroutine, but it didn't put the data into it yet.
|
2020-09-17 09:01:53 +00:00
|
|
|
if err := s1.ForceMergePartitions(""); err != nil {
|
|
|
|
return fmt.Errorf("error when force merging partitions: %w", err)
|
|
|
|
}
|
|
|
|
ptws := s1.tb.GetPartitions(nil)
|
|
|
|
for _, ptw := range ptws {
|
2023-02-01 17:54:21 +00:00
|
|
|
pws := ptw.pt.GetParts(nil, true)
|
2020-09-17 09:01:53 +00:00
|
|
|
numParts := len(pws)
|
|
|
|
ptw.pt.PutParts(pws)
|
2023-07-13 22:01:56 +00:00
|
|
|
if numParts > 1 {
|
2021-02-17 12:59:04 +00:00
|
|
|
s1.tb.PutPartitions(ptws)
|
2023-07-13 22:01:56 +00:00
|
|
|
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want at most 1", ptw.pt.name, numParts)
|
2020-09-17 09:01:53 +00:00
|
|
|
}
|
|
|
|
}
|
2021-02-17 12:59:04 +00:00
|
|
|
s1.tb.PutPartitions(ptws)
|
2020-09-17 09:01:53 +00:00
|
|
|
|
2019-05-22 21:16:55 +00:00
|
|
|
s1.MustClose()
|
|
|
|
|
|
|
|
// Delete the snapshot and make sure it is no longer visible.
|
|
|
|
if err := s.DeleteSnapshot(snapshotName); err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
snapshots, err = s.ListSnapshots()
|
|
|
|
if err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("cannot list snapshots: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
if containsString(snapshots, snapshotName) {
|
|
|
|
return fmt.Errorf("snapshot %q must be deleted, but is still visible in %q", snapshotName, snapshots)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestStorageRotateIndexDB(t *testing.T) {
|
|
|
|
path := "TestStorageRotateIndexDB"
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, 0, 0, 0)
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
// Start indexDB rotater in a separate goroutine
|
|
|
|
stopCh := make(chan struct{})
|
|
|
|
rotateDoneCh := make(chan struct{})
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-stopCh:
|
|
|
|
close(rotateDoneCh)
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
time.Sleep(time.Millisecond)
|
2023-07-29 02:47:02 +00:00
|
|
|
s.mustRotateIndexDB(time.Now())
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Run concurrent workers that insert / select data from the storage.
|
|
|
|
ch := make(chan error, 3)
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
go func(workerNum int) {
|
|
|
|
ch <- testStorageAddMetrics(s, workerNum)
|
|
|
|
}(i)
|
|
|
|
}
|
|
|
|
for i := 0; i < cap(ch); i++ {
|
|
|
|
select {
|
|
|
|
case err := <-ch:
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("unexpected error: %s", err)
|
|
|
|
}
|
|
|
|
case <-time.After(10 * time.Second):
|
|
|
|
t.Fatalf("timeout")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
close(stopCh)
|
|
|
|
<-rotateDoneCh
|
|
|
|
|
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func testStorageAddMetrics(s *Storage, workerNum int) error {
|
2023-01-24 04:10:29 +00:00
|
|
|
rng := rand.New(rand.NewSource(1))
|
2019-05-22 21:16:55 +00:00
|
|
|
const rowsCount = 1e3
|
|
|
|
|
|
|
|
var mn MetricName
|
|
|
|
mn.Tags = []Tag{
|
|
|
|
{[]byte("job"), []byte(fmt.Sprintf("webservice_%d", workerNum))},
|
|
|
|
{[]byte("instance"), []byte("1.2.3.4")},
|
|
|
|
}
|
|
|
|
for i := 0; i < rowsCount; i++ {
|
2023-01-24 04:10:29 +00:00
|
|
|
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", workerNum, rng.Intn(10)))
|
2019-05-22 21:16:55 +00:00
|
|
|
metricNameRaw := mn.marshalRaw(nil)
|
2023-01-24 04:10:29 +00:00
|
|
|
timestamp := rng.Int63n(1e10)
|
|
|
|
value := rng.NormFloat64() * 1e6
|
2019-05-22 21:16:55 +00:00
|
|
|
|
|
|
|
mr := MetricRow{
|
|
|
|
MetricNameRaw: metricNameRaw,
|
|
|
|
Timestamp: timestamp,
|
|
|
|
Value: value,
|
|
|
|
}
|
|
|
|
if err := s.AddRows([]MetricRow{mr}, defaultPrecisionBits); err != nil {
|
2020-06-30 19:58:18 +00:00
|
|
|
return fmt.Errorf("unexpected error when adding mrs: %w", err)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Verify the storage contains rows.
|
|
|
|
minRowsExpected := uint64(rowsCount)
|
|
|
|
var m Metrics
|
|
|
|
s.UpdateMetrics(&m)
|
2022-12-05 23:15:00 +00:00
|
|
|
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
|
|
|
|
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
|
2019-05-22 21:16:55 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-05-02 08:00:15 +00:00
|
|
|
func TestStorageDeleteStaleSnapshots(t *testing.T) {
|
2023-01-24 04:10:29 +00:00
|
|
|
rng := rand.New(rand.NewSource(1))
|
2022-05-02 08:00:15 +00:00
|
|
|
path := "TestStorageDeleteStaleSnapshots"
|
2022-12-05 23:15:00 +00:00
|
|
|
retentionMsecs := int64(msecsPerMonth * 10)
|
2023-04-15 06:01:20 +00:00
|
|
|
s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5)
|
2022-05-02 08:00:15 +00:00
|
|
|
const rowsPerAdd = 1e3
|
|
|
|
const addsCount = 10
|
2022-12-05 23:15:00 +00:00
|
|
|
maxTimestamp := timestampFromTime(time.Now())
|
|
|
|
minTimestamp := maxTimestamp - s.retentionMsecs
|
2022-05-02 08:00:15 +00:00
|
|
|
for i := 0; i < addsCount; i++ {
|
2023-01-24 04:10:29 +00:00
|
|
|
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
|
2022-05-02 08:00:15 +00:00
|
|
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
|
|
|
t.Fatalf("unexpected error when adding mrs: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Try creating a snapshot from the storage.
|
2023-02-27 20:12:03 +00:00
|
|
|
snapshotName, err := s.CreateSnapshot(0)
|
2022-05-02 08:00:15 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot create snapshot from the storage: %s", err)
|
|
|
|
}
|
|
|
|
// Delete snapshots older than 1 month
|
|
|
|
if err := s.DeleteStaleSnapshots(30 * 24 * time.Hour); err != nil {
|
|
|
|
t.Fatalf("error in DeleteStaleSnapshots(1 month): %s", err)
|
|
|
|
}
|
|
|
|
snapshots, err := s.ListSnapshots()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot list snapshots: %s", err)
|
|
|
|
}
|
|
|
|
if len(snapshots) != 1 {
|
|
|
|
t.Fatalf("expecting one snapshot; got %q", snapshots)
|
|
|
|
}
|
|
|
|
if snapshots[0] != snapshotName {
|
|
|
|
t.Fatalf("snapshot %q is missing in %q", snapshotName, snapshots)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete the snapshot which is older than 1 nanoseconds
|
|
|
|
time.Sleep(2 * time.Nanosecond)
|
|
|
|
if err := s.DeleteStaleSnapshots(time.Nanosecond); err != nil {
|
|
|
|
t.Fatalf("cannot delete snapshot %q: %s", snapshotName, err)
|
|
|
|
}
|
|
|
|
snapshots, err = s.ListSnapshots()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot list snapshots: %s", err)
|
|
|
|
}
|
|
|
|
if len(snapshots) != 0 {
|
|
|
|
t.Fatalf("expecting zero snapshots; got %q", snapshots)
|
|
|
|
}
|
|
|
|
s.MustClose()
|
|
|
|
if err := os.RemoveAll(path); err != nil {
|
|
|
|
t.Fatalf("cannot remove %q: %s", path, err)
|
|
|
|
}
|
|
|
|
}
|