VictoriaMetrics/lib/storage/storage_test.go

2168 lines
67 KiB
Go
Raw Normal View History

2019-05-22 21:16:55 +00:00
package storage
import (
"fmt"
"math/rand"
"os"
"path/filepath"
2019-05-22 21:16:55 +00:00
"reflect"
"sort"
"sync"
2019-05-22 21:16:55 +00:00
"testing"
"testing/quick"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
2019-05-22 21:16:55 +00:00
)
func TestReplaceAlternateRegexpsWithGraphiteWildcards(t *testing.T) {
f := func(q, resultExpected string) {
t.Helper()
result := replaceAlternateRegexpsWithGraphiteWildcards([]byte(q))
if string(result) != resultExpected {
t.Fatalf("unexpected result for %s\ngot\n%s\nwant\n%s", q, result, resultExpected)
}
}
f("", "")
f("foo", "foo")
f("foo(bar", "foo(bar")
f("foo.(bar|baz", "foo.(bar|baz")
f("foo.(bar).x", "foo.{bar}.x")
f("foo.(bar|baz).*.{x,y}", "foo.{bar,baz}.*.{x,y}")
f("foo.(bar|baz).*.{x,y}(z|aa)", "foo.{bar,baz}.*.{x,y}{z,aa}")
f("foo(.*)", "foo*")
}
func TestGetRegexpForGraphiteNodeQuery(t *testing.T) {
f := func(q, expectedRegexp string) {
t.Helper()
re, err := getRegexpForGraphiteQuery(q)
if err != nil {
t.Fatalf("unexpected error for query=%q: %s", q, err)
}
reStr := re.String()
if reStr != expectedRegexp {
t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp)
}
}
f(``, `^$`)
f(`*`, `^[^.]*$`)
f(`foo.`, `^foo\.$`)
f(`foo.bar`, `^foo\.bar$`)
f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`)
f(`[-a-zx.]`, `^[-a-zx.]$`)
f(`**`, `^[^.]*[^.]*$`)
f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`)
f(`foo{bar`, `^foo\{bar$`)
f(`foo{ba,r`, `^foo\{ba,r$`)
f(`foo[bar`, `^foo\[bar$`)
f(`foo{bar}`, `^foobar$`)
f(`foo{bar,,b{{a,b*},z},[x-y]*z}a`, `^foo(?:bar||b(?:(?:a|b[^.]*)|z)|[x-y][^.]*z)a$`)
}
func TestDateMetricIDCacheSerial(t *testing.T) {
c := newDateMetricIDCache()
if err := testDateMetricIDCache(c, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
func TestDateMetricIDCacheConcurrent(t *testing.T) {
c := newDateMetricIDCache()
ch := make(chan error, 5)
for i := 0; i < 5; i++ {
go func() {
ch <- testDateMetricIDCache(c, true)
}()
}
for i := 0; i < 5; i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(time.Second * 5):
t.Fatalf("timeout")
}
}
}
func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
type dmk struct {
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
generation uint64
date uint64
metricID uint64
}
m := make(map[dmk]bool)
for i := 0; i < 1e5; i++ {
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
generation := uint64(i) % 2
date := uint64(i) % 2
metricID := uint64(i) % 1237
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
if !concurrent && c.Has(generation, date, metricID) {
if !m[dmk{generation, date, metricID}] {
return fmt.Errorf("c.Has(%d, %d, %d) must return false, but returned true", generation, date, metricID)
}
continue
}
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
c.Set(generation, date, metricID)
m[dmk{generation, date, metricID}] = true
if !concurrent && !c.Has(generation, date, metricID) {
return fmt.Errorf("c.Has(%d, %d, %d) must return true, but returned false", generation, date, metricID)
}
if i%11234 == 0 {
c.mu.Lock()
c.syncLocked()
c.mu.Unlock()
}
if i%34323 == 0 {
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
c.mu.Lock()
c.resetLocked()
c.mu.Unlock()
m = make(map[dmk]bool)
}
}
// Verify fast path after sync.
for i := 0; i < 1e5; i++ {
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
generation := uint64(i) % 2
date := uint64(i) % 2
metricID := uint64(i) % 123
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
c.Set(generation, date, metricID)
}
c.mu.Lock()
c.syncLocked()
c.mu.Unlock()
for i := 0; i < 1e5; i++ {
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
generation := uint64(i) % 2
date := uint64(i) % 2
metricID := uint64(i) % 123
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
if !concurrent && !c.Has(generation, date, metricID) {
return fmt.Errorf("c.Has(%d, %d, %d) must return true after sync", generation, date, metricID)
}
}
// Verify c.Reset
if n := c.EntriesCount(); !concurrent && n < 123 {
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
}
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
c.mu.Lock()
c.resetLocked()
c.mu.Unlock()
if n := c.EntriesCount(); !concurrent && n > 0 {
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
}
return nil
}
func TestDateMetricIDCacheIsConsistent(_ *testing.T) {
const (
generation = 1
date = 1
concurrency = 2
numMetrics = 100000
)
dmc := newDateMetricIDCache()
var wg sync.WaitGroup
for i := range concurrency {
wg.Add(1)
go func() {
defer wg.Done()
for id := uint64(i * numMetrics); id < uint64((i+1)*numMetrics); id++ {
dmc.Set(generation, date, id)
if !dmc.Has(generation, date, id) {
panic(fmt.Errorf("dmc.Has(metricID=%d): unexpected cache miss after adding the entry to cache", id))
}
}
}()
}
wg.Wait()
}
func TestUpdateCurrHourMetricIDs(t *testing.T) {
newStorage := func() *Storage {
var s Storage
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{})
s.pendingHourEntries = &uint64set.Set{}
return &s
}
t.Run("empty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
hour := fasttime.UnixHour()
if hour%24 == 0 {
hour++
}
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour - 1,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load()
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
if hmCurr.m.Len() != 0 {
t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0)
}
hmPrev := s.prevHourMetricIDs.Load()
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
2019-11-08 17:37:16 +00:00
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("empty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load()
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
if !reflect.DeepEqual(hmCurr, hmOrig) {
t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig)
}
hmPrev := s.prevHourMetricIDs.Load()
hmEmpty := &hourMetricIDs{}
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
2019-11-08 17:37:16 +00:00
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_stale_curr_hour", func(t *testing.T) {
s := newStorage()
2019-11-08 17:37:16 +00:00
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := fasttime.UnixHour()
if hour%24 == 0 {
hour++
}
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour - 1,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load()
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
2019-11-08 17:37:16 +00:00
if !hmCurr.m.Equal(pendingHourEntries) {
t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries)
}
hmPrev := s.prevHourMetricIDs.Load()
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
2019-11-08 17:37:16 +00:00
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour", func(t *testing.T) {
s := newStorage()
2019-11-08 17:37:16 +00:00
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := fasttime.UnixHour()
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load()
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
m := pendingHourEntries.Clone()
hmOrig.m.ForEach(func(part []uint64) bool {
for _, metricID := range part {
m.Add(metricID)
}
return true
})
if !hmCurr.m.Equal(m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
}
hmPrev := s.prevHourMetricIDs.Load()
hmEmpty := &hourMetricIDs{}
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_valid_curr_hour_start_of_day", func(t *testing.T) {
s := newStorage()
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hour := fasttime.UnixHour()
hour -= hour % 24
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour,
}
hmOrig.m.Add(12)
hmOrig.m.Add(34)
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load()
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
2019-11-08 17:37:16 +00:00
m := pendingHourEntries.Clone()
hmOrig.m.ForEach(func(part []uint64) bool {
for _, metricID := range part {
m.Add(metricID)
}
return true
})
if !hmCurr.m.Equal(m) {
t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m)
}
hmPrev := s.prevHourMetricIDs.Load()
hmEmpty := &hourMetricIDs{}
if !reflect.DeepEqual(hmPrev, hmEmpty) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty)
}
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
t.Run("nonempty_pending_metric_ids_from_previous_hour_new_day", func(t *testing.T) {
s := newStorage()
hour := fasttime.UnixHour()
hour -= hour % 24
pendingHourEntries := &uint64set.Set{}
pendingHourEntries.Add(343)
pendingHourEntries.Add(32424)
pendingHourEntries.Add(8293432)
s.pendingHourEntries = pendingHourEntries
hmOrig := &hourMetricIDs{
m: &uint64set.Set{},
hour: hour - 1,
}
s.currHourMetricIDs.Store(hmOrig)
s.updateCurrHourMetricIDs(hour)
hmCurr := s.currHourMetricIDs.Load()
if hmCurr.hour != hour {
t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour)
}
if hmCurr.m.Len() != 0 {
t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil))
}
hmPrev := s.prevHourMetricIDs.Load()
if !reflect.DeepEqual(hmPrev, hmOrig) {
t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig)
}
2019-11-08 17:37:16 +00:00
if s.pendingHourEntries.Len() != 0 {
t.Fatalf("unexpected s.pendingHourEntries.Len(); got %d; want %d", s.pendingHourEntries.Len(), 0)
}
})
}
2019-05-22 21:16:55 +00:00
func TestMetricRowMarshalUnmarshal(t *testing.T) {
var buf []byte
typ := reflect.TypeOf(&MetricRow{})
rng := rand.New(rand.NewSource(1))
2019-05-22 21:16:55 +00:00
for i := 0; i < 1000; i++ {
v, ok := quick.Value(typ, rng)
2019-05-22 21:16:55 +00:00
if !ok {
t.Fatalf("cannot create random MetricRow via quick.Value")
}
mr1 := v.Interface().(*MetricRow)
if mr1 == nil {
continue
}
buf = mr1.Marshal(buf[:0])
var mr2 MetricRow
tail, err := mr2.UnmarshalX(buf)
2019-05-22 21:16:55 +00:00
if err != nil {
t.Fatalf("cannot unmarshal mr1=%s: %s", mr1, err)
}
if len(tail) > 0 {
t.Fatalf("non-empty tail returned after MetricRow.Unmarshal for mr1=%s", mr1)
}
if mr1.MetricNameRaw == nil {
mr1.MetricNameRaw = []byte{}
}
if mr2.MetricNameRaw == nil {
mr2.MetricNameRaw = []byte{}
}
if !reflect.DeepEqual(mr1, &mr2) {
t.Fatalf("mr1 should match mr2; got\nmr1=%s\nmr2=%s", mr1, &mr2)
}
}
}
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
func TestNextRetentionDeadlineSeconds(t *testing.T) {
f := func(currentTime string, retention, offset time.Duration, deadlineExpected string) {
t.Helper()
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
now, err := time.Parse(time.RFC3339, currentTime)
if err != nil {
t.Fatalf("cannot parse currentTime=%q: %s", currentTime, err)
2019-05-22 21:16:55 +00:00
}
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
d := nextRetentionDeadlineSeconds(now.Unix(), int64(retention.Seconds()), int64(offset.Seconds()))
deadline := time.Unix(d, 0).UTC().Format(time.RFC3339)
if deadline != deadlineExpected {
t.Fatalf("unexpected deadline; got %s; want %s", deadline, deadlineExpected)
}
}
lib/storage: pre-create timeseries before indexDB rotation (#4652) * lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2023-07-22 22:20:21 +00:00
f("2023-07-22T12:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
f("2023-07-22T03:44:35Z", 24*time.Hour, 0, "2023-07-22T04:00:00Z")
f("2023-07-22T04:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
f("2023-07-22T23:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
f("2023-07-23T03:59:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-22T01:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-22T02:00:00Z")
f("2023-07-22T02:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-22T23:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-23T01:59:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
f("2023-07-22T08:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-22T09:00:00Z")
f("2023-07-22T09:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
f("2023-07-22T15:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
f("2023-07-22T16:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-23T16:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
f("2023-07-22T21:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
f("2023-07-22T22:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-23T22:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
f("2023-07-22T09:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-22T10:00:00Z")
f("2023-07-22T10:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
f("2023-07-22T14:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
f("2023-07-22T15:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-23T15:00:00Z")
2019-05-22 21:16:55 +00:00
}
func TestStorageOpenClose(t *testing.T) {
path := "TestStorageOpenClose"
for i := 0; i < 10; i++ {
s := MustOpenStorage(path, -1, 1e5, 1e6)
2019-05-22 21:16:55 +00:00
s.MustClose()
}
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageRandTimestamps(t *testing.T) {
path := "TestStorageRandTimestamps"
retention := 10 * retention31Days
s := MustOpenStorage(path, retention, 0, 0)
2019-05-22 21:16:55 +00:00
t.Run("serial", func(t *testing.T) {
for i := 0; i < 3; i++ {
if err := testStorageRandTimestamps(s); err != nil {
t.Fatalf("error on iteration %d: %s", i, err)
2019-05-22 21:16:55 +00:00
}
s.MustClose()
s = MustOpenStorage(path, retention, 0, 0)
2019-05-22 21:16:55 +00:00
}
})
t.Run("concurrent", func(t *testing.T) {
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
var err error
for i := 0; i < 2; i++ {
err = testStorageRandTimestamps(s)
}
ch <- err
}()
}
tt := time.NewTimer(time.Second * 10)
2019-05-22 21:16:55 +00:00
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("error on iteration %d: %s", i, err)
2019-05-22 21:16:55 +00:00
}
case <-tt.C:
t.Fatalf("timeout on iteration %d", i)
2019-05-22 21:16:55 +00:00
}
}
})
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageRandTimestamps(s *Storage) error {
currentTime := timestampFromTime(time.Now())
const rowsPerAdd = 5e3
const addsCount = 3
rng := rand.New(rand.NewSource(1))
2019-05-22 21:16:55 +00:00
for i := 0; i < addsCount; i++ {
var mrs []MetricRow
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
}
for j := 0; j < rowsPerAdd; j++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rng.Intn(100)))
2019-05-22 21:16:55 +00:00
metricNameRaw := mn.marshalRaw(nil)
timestamp := currentTime - int64((rng.Float64()-0.2)*float64(2*s.retentionMsecs))
value := rng.NormFloat64() * 1e11
2019-05-22 21:16:55 +00:00
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
s.AddRows(mrs, defaultPrecisionBits)
2019-05-22 21:16:55 +00:00
}
// Verify the storage contains rows.
var m Metrics
s.UpdateMetrics(&m)
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount == 0 {
return fmt.Errorf("expecting at least one row in storage")
2019-05-22 21:16:55 +00:00
}
return nil
}
func TestStorageDeleteSeries(t *testing.T) {
path := "TestStorageDeleteSeries"
s := MustOpenStorage(path, 0, 0, 0)
2019-05-22 21:16:55 +00:00
// Verify no label names exist
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
2019-05-22 21:16:55 +00:00
if err != nil {
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
2019-05-22 21:16:55 +00:00
}
if len(lns) != 0 {
t.Fatalf("found non-empty tag keys at the start: %q", lns)
2019-05-22 21:16:55 +00:00
}
t.Run("serial", func(t *testing.T) {
for i := 0; i < 3; i++ {
if err = testStorageDeleteSeries(s, 0); err != nil {
t.Fatalf("unexpected error on iteration %d: %s", i, err)
2019-05-22 21:16:55 +00:00
}
// Re-open the storage in order to check how deleted metricIDs
// are persisted.
s.MustClose()
s = MustOpenStorage(path, 0, 0, 0)
2019-05-22 21:16:55 +00:00
}
})
t.Run("concurrent", func(t *testing.T) {
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func(workerNum int) {
var err error
for j := 0; j < 2; j++ {
err = testStorageDeleteSeries(s, workerNum)
2019-05-22 21:16:55 +00:00
if err != nil {
break
}
}
ch <- err
}(i)
}
tt := time.NewTimer(30 * time.Second)
2019-05-22 21:16:55 +00:00
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error on iteration %d: %s", i, err)
2019-05-22 21:16:55 +00:00
}
case <-tt.C:
t.Fatalf("timeout on iteration %d", i)
2019-05-22 21:16:55 +00:00
}
}
})
// Verify no more tag keys exist
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
2019-05-22 21:16:55 +00:00
if err != nil {
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange after the test: %s", err)
2019-05-22 21:16:55 +00:00
}
if len(lns) != 0 {
t.Fatalf("found non-empty tag keys after the test: %q", lns)
2019-05-22 21:16:55 +00:00
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageDeleteSeries(s *Storage, workerNum int) error {
rng := rand.New(rand.NewSource(1))
2019-05-22 21:16:55 +00:00
const rowsPerMetric = 100
const metricsCount = 30
workerTag := []byte(fmt.Sprintf("workerTag_%d", workerNum))
lnsAll := make(map[string]bool)
lnsAll["__name__"] = true
2019-05-22 21:16:55 +00:00
for i := 0; i < metricsCount; i++ {
var mrs []MetricRow
var mn MetricName
job := fmt.Sprintf("job_%d_%d", i, workerNum)
instance := fmt.Sprintf("instance_%d_%d", i, workerNum)
mn.Tags = []Tag{
{[]byte("job"), []byte(job)},
{[]byte("instance"), []byte(instance)},
{workerTag, []byte("foobar")},
}
for i := range mn.Tags {
lnsAll[string(mn.Tags[i].Key)] = true
2019-05-22 21:16:55 +00:00
}
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", i, workerNum))
metricNameRaw := mn.marshalRaw(nil)
for j := 0; j < rowsPerMetric; j++ {
timestamp := rng.Int63n(1e10)
value := rng.NormFloat64() * 1e6
2019-05-22 21:16:55 +00:00
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
s.AddRows(mrs, defaultPrecisionBits)
2019-05-22 21:16:55 +00:00
}
s.DebugFlush()
2019-05-22 21:16:55 +00:00
// Verify tag values exist
tvs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange before metrics removal: %w", err)
2019-05-22 21:16:55 +00:00
}
if len(tvs) == 0 {
return fmt.Errorf("unexpected empty number of tag values for workerTag")
}
// Verify tag keys exist
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange before metrics removal: %w", err)
2019-05-22 21:16:55 +00:00
}
if err := checkLabelNames(lns, lnsAll); err != nil {
return fmt.Errorf("unexpected label names before metrics removal: %w", err)
2019-05-22 21:16:55 +00:00
}
var sr Search
tr := TimeRange{
MinTimestamp: 0,
MaxTimestamp: 2e10,
}
metricBlocksCount := func(tfs *TagFilters) int {
// Verify the number of blocks
2019-05-22 21:16:55 +00:00
n := 0
sr.Init(nil, s, []*TagFilters{tfs}, tr, 1e5, noDeadline)
2019-05-22 21:16:55 +00:00
for sr.NextMetricBlock() {
n++
}
sr.MustClose()
return n
}
for i := 0; i < metricsCount; i++ {
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte("metric_.+"), false, true); err != nil {
return fmt.Errorf("cannot add regexp tag filter: %w", err)
2019-05-22 21:16:55 +00:00
}
job := fmt.Sprintf("job_%d_%d", i, workerNum)
if err := tfs.Add([]byte("job"), []byte(job), false, false); err != nil {
return fmt.Errorf("cannot add job tag filter: %w", err)
2019-05-22 21:16:55 +00:00
}
if n := metricBlocksCount(tfs); n == 0 {
return fmt.Errorf("expecting non-zero number of metric blocks for tfs=%s", tfs)
}
deletedCount, err := s.DeleteSeries(nil, []*TagFilters{tfs}, 1e9)
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt.Errorf("cannot delete metrics: %w", err)
2019-05-22 21:16:55 +00:00
}
if deletedCount == 0 {
return fmt.Errorf("expecting non-zero number of deleted metrics on iteration %d", i)
2019-05-22 21:16:55 +00:00
}
if n := metricBlocksCount(tfs); n != 0 {
return fmt.Errorf("expecting zero metric blocks after DeleteSeries call for tfs=%s; got %d blocks", tfs, n)
2019-05-22 21:16:55 +00:00
}
// Try deleting empty tfss
deletedCount, err = s.DeleteSeries(nil, nil, 1e9)
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt.Errorf("cannot delete empty tfss: %w", err)
2019-05-22 21:16:55 +00:00
}
if deletedCount != 0 {
return fmt.Errorf("expecting zero deleted metrics for empty tfss; got %d", deletedCount)
}
}
// Make sure no more metrics left for the given workerNum
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte(fmt.Sprintf("metric_.+_%d", workerNum)), false, true); err != nil {
return fmt.Errorf("cannot add regexp tag filter for worker metrics: %w", err)
2019-05-22 21:16:55 +00:00
}
if n := metricBlocksCount(tfs); n != 0 {
return fmt.Errorf("expecting zero metric blocks after deleting all the metrics; got %d blocks", n)
}
tvs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, string(workerTag), nil, TimeRange{}, 1e5, 1e9, noDeadline)
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange after all the metrics are removed: %w", err)
2019-05-22 21:16:55 +00:00
}
if len(tvs) != 0 {
return fmt.Errorf("found non-empty tag values for %q after metrics removal: %q", workerTag, tvs)
}
return nil
}
func checkLabelNames(lns []string, lnsExpected map[string]bool) error {
if len(lns) < len(lnsExpected) {
return fmt.Errorf("unexpected number of label names found; got %d; want at least %d; lns=%q, lnsExpected=%v", len(lns), len(lnsExpected), lns, lnsExpected)
2019-05-22 21:16:55 +00:00
}
hasItem := func(s string, lns []string) bool {
for _, labelName := range lns {
if s == labelName {
2019-05-22 21:16:55 +00:00
return true
}
}
return false
}
for labelName := range lnsExpected {
if !hasItem(labelName, lns) {
return fmt.Errorf("cannot find %q in label names %q", labelName, lns)
2019-05-22 21:16:55 +00:00
}
}
return nil
}
func TestStorageDeleteSeries_TooManyTimeseries(t *testing.T) {
defer testRemoveAll(t)
const numSeries = 1000
rng := rand.New(rand.NewSource(1))
mrs := testGenerateMetricRowsWithPrefix(rng, numSeries, "metric", TimeRange{
MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(),
MaxTimestamp: time.Now().UnixMilli(),
})
s := MustOpenStorage(t.Name(), 0, 0, 0)
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil {
t.Fatalf("unexpected error in TagFilters.Add: %v", err)
}
maxSeries := numSeries - 1
count, err := s.DeleteSeries(nil, []*TagFilters{tfs}, maxSeries)
if err == nil {
t.Errorf("expected an error but there hasn't been one")
}
if count != 0 {
t.Errorf("unexpected deleted series count: got %d, want 0", count)
}
}
func TestStorageDeleteSeries_CachesAreUpdatedOrReset(t *testing.T) {
defer testRemoveAll(t)
tr := TimeRange{
MinTimestamp: time.Now().Add(-100 * 24 * time.Hour).UnixMilli(),
MaxTimestamp: time.Now().UnixMilli(),
}
mn := MetricName{MetricGroup: []byte("metric")}
mr := MetricRow{
MetricNameRaw: mn.marshalRaw(nil),
Timestamp: tr.MaxTimestamp,
Value: 123,
}
var (
genTSID generationTSID
tfssKey []byte
)
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte("metric.*"), false, true); err != nil {
t.Fatalf("unexpected error in TagFilters.Add: %v", err)
}
tfss := []*TagFilters{tfs}
s := MustOpenStorage(t.Name(), 0, 0, 0)
defer s.MustClose()
// Ensure caches are empty.
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID)
}
tfssKey = marshalTagFiltersKey(nil, tfss, tr, true)
if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok {
t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got)
}
if got := s.getDeletedMetricIDs().Len(); got != 0 {
t.Fatalf("deletedMetricIDs cache: unexpected size: got %d, want empty", got)
}
// Add one row, search it, and ensure that the tsidCache and
// tagFiltersToMetricIDsCache are not empty but the deletedMetricIDs
// cache is still empty.
s.AddRows([]MetricRow{mr}, defaultPrecisionBits)
s.DebugFlush()
gotMetrics, err := s.SearchMetricNames(nil, tfss, tr, 1, noDeadline)
if err != nil {
t.Fatalf("SearchMetricNames() failed unexpectedly: %v", err)
}
wantMetrics := []string{string(mr.MetricNameRaw)}
if reflect.DeepEqual(gotMetrics, wantMetrics) {
t.Fatalf("SearchMetricNames() unexpected search result: got %v, want %v", gotMetrics, wantMetrics)
}
if !s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
t.Fatalf("tsidCache was expected to contain a record but it did not")
}
metricID := genTSID.TSID.MetricID
tfssKey = marshalTagFiltersKey(nil, tfss, tr, true)
if _, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); !ok {
t.Fatalf("tagFiltersToMetricIDsCache was expected to contain a record but it did not")
}
if got := s.getDeletedMetricIDs().Len(); got != 0 {
t.Fatalf("deletedMetricIDs cache unexpected size: got %d, want empty", got)
}
// Delete the metric added earlier and ensure that the tsidCache and
// tagFiltersToMetricIDsCache have been reset and the deletedMetricIDs
// cache is now contains ID of the deleted metric.
numDeletedSeries, err := s.DeleteSeries(nil, tfss, 1)
if err != nil {
t.Fatalf("DeleteSeries() failed unexpectedly: %v", err)
}
if got, want := numDeletedSeries, 1; got != want {
t.Fatalf("unexpected number of deleted series, got %d, want %d", got, want)
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
t.Fatalf("tsidCache unexpected contents: got %v, want empty", genTSID)
}
tfssKey = marshalTagFiltersKey(nil, tfss, tr, true)
if got, ok := s.idb().getMetricIDsFromTagFiltersCache(nil, tfssKey); ok {
t.Fatalf("tagFiltersToMetricIDsCache unexpected contents: got %v, want empty", got)
}
if got, want := s.getDeletedMetricIDs().AppendTo(nil), []uint64{metricID}; !reflect.DeepEqual(got, want) {
t.Fatalf("deletedMetricIDs cache: unexpected contents: got %v, want %v", got, want)
}
}
func TestStorageRegisterMetricNamesSerial(t *testing.T) {
path := "TestStorageRegisterMetricNamesSerial"
s := MustOpenStorage(path, 0, 0, 0)
if err := testStorageRegisterMetricNames(s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageRegisterMetricNamesConcurrent(t *testing.T) {
path := "TestStorageRegisterMetricNamesConcurrent"
s := MustOpenStorage(path, 0, 0, 0)
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func() {
ch <- testStorageRegisterMetricNames(s)
}()
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageRegisterMetricNames(s *Storage) error {
const metricsPerAdd = 1e3
const addsCount = 10
addIDsMap := make(map[string]struct{})
for i := 0; i < addsCount; i++ {
var mrs []MetricRow
var mn MetricName
addID := fmt.Sprintf("%d", i)
addIDsMap[addID] = struct{}{}
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
{[]byte("add_id"), []byte(addID)},
}
now := timestampFromTime(time.Now())
for j := 0; j < metricsPerAdd; j++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", j))
metricNameRaw := mn.marshalRaw(nil)
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: now,
}
mrs = append(mrs, mr)
}
lib/storage: switch from global to per-day index for `MetricName -> TSID` mapping Previously all the newly ingested time series were registered in global `MetricName -> TSID` index. This index was used during data ingestion for locating the TSID (internal series id) for the given canonical metric name (the canonical metric name consists of metric name plus all its labels sorted by label names). The `MetricName -> TSID` index is stored on disk in order to make sure that the data isn't lost on VictoriaMetrics restart or unclean shutdown. The lookup in this index is relatively slow, since VictoriaMetrics needs to read the corresponding data block from disk, unpack it, put the unpacked block into `indexdb/dataBlocks` cache, and then search for the given `MetricName -> TSID` entry there. So VictoriaMetrics uses in-memory cache for speeding up the lookup for active time series. This cache is named `storage/tsid`. If this cache capacity is enough for all the currently ingested active time series, then VictoriaMetrics works fast, since it doesn't need to read the data from disk. VictoriaMetrics starts reading data from `MetricName -> TSID` on-disk index in the following cases: - If `storage/tsid` cache capacity isn't enough for active time series. Then just increase available memory for VictoriaMetrics or reduce the number of active time series ingested into VictoriaMetrics. - If new time series is ingested into VictoriaMetrics. In this case it cannot find the needed entry in the `storage/tsid` cache, so it needs to consult on-disk `MetricName -> TSID` index, since it doesn't know that the index has no the corresponding entry too. This is a typical event under high churn rate, when old time series are constantly substituted with new time series. Reading the data from `MetricName -> TSID` index is slow, so inserts, which lead to reading this index, are counted as slow inserts, and they can be monitored via `vm_slow_row_inserts_total` metric exposed by VictoriaMetrics. Prior to this commit the `MetricName -> TSID` index was global, e.g. it contained entries sorted by `MetricName` for all the time series ever ingested into VictoriaMetrics during the configured -retentionPeriod. This index can become very large under high churn rate and long retention. VictoriaMetrics caches data from this index in `indexdb/dataBlocks` in-memory cache for speeding up index lookups. The `indexdb/dataBlocks` cache may occupy significant share of available memory for storing recently accessed blocks at `MetricName -> TSID` index when searching for newly ingested time series. This commit switches from global `MetricName -> TSID` index to per-day index. This allows significantly reducing the amounts of data, which needs to be cached in `indexdb/dataBlocks`, since now VictoriaMetrics consults only the index for the current day when new time series is ingested into it. The downside of this change is increased indexdb size on disk for workloads without high churn rate, e.g. with static time series, which do no change over time, since now VictoriaMetrics needs to store identical `MetricName -> TSID` entries for static time series for every day. This change removes an optimization for reducing CPU and disk IO spikes at indexdb rotation, since it didn't work correctly - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 . At the same time the change fixes the issue, which could result in lost access to time series, which stop receving new samples during the first hour after indexdb rotation - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698 The issue with the increased CPU and disk IO usage during indexdb rotation will be addressed in a separate commit according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401#issuecomment-1553488685 This is a follow-up for 1f28b46ae9350795af41cbfc3ca0e8a5af084fce
2023-07-13 22:33:41 +00:00
s.RegisterMetricNames(nil, mrs)
}
var addIDsExpected []string
for k := range addIDsMap {
addIDsExpected = append(addIDsExpected, k)
}
sort.Strings(addIDsExpected)
// Verify the storage contains the added metric names.
s.DebugFlush()
// Verify that SearchLabelNamesWithFiltersOnTimeRange returns correct result.
lnsExpected := []string{
"__name__",
"add_id",
"instance",
"job",
}
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 100, 1e9, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, lnsExpected) {
return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
}
// Verify that SearchLabelNamesWithFiltersOnTimeRange with the specified time range returns correct result.
now := timestampFromTime(time.Now())
start := now - msecPerDay
end := now + 60*1000
tr := TimeRange{
MinTimestamp: start,
MaxTimestamp: end,
}
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, tr, 100, 1e9, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchLabelNamesWithFiltersOnTimeRange: %w", err)
}
sort.Strings(lns)
if !reflect.DeepEqual(lns, lnsExpected) {
return fmt.Errorf("unexpected label names returned from SearchLabelNamesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", lns, lnsExpected)
}
// Verify that SearchLabelValuesWithFiltersOnTimeRange returns correct result.
addIDs, err := s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, TimeRange{}, addsCount+100, 1e9, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
}
sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
}
// Verify that SearchLabelValuesWithFiltersOnTimeRange with the specified time range returns correct result.
addIDs, err = s.SearchLabelValuesWithFiltersOnTimeRange(nil, "add_id", nil, tr, addsCount+100, 1e9, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchLabelValuesWithFiltersOnTimeRange: %w", err)
}
sort.Strings(addIDs)
if !reflect.DeepEqual(addIDs, addIDsExpected) {
return fmt.Errorf("unexpected tag values returned from SearchLabelValuesWithFiltersOnTimeRange;\ngot\n%q\nwant\n%q", addIDs, addIDsExpected)
}
// Verify that SearchMetricNames returns correct result.
tfs := NewTagFilters()
if err := tfs.Add([]byte("add_id"), []byte("0"), false, false); err != nil {
return fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
}
metricNames, err := s.SearchMetricNames(nil, []*TagFilters{tfs}, tr, metricsPerAdd*addsCount*100+100, noDeadline)
if err != nil {
return fmt.Errorf("error in SearchMetricNames: %w", err)
}
if len(metricNames) < metricsPerAdd {
return fmt.Errorf("unexpected number of metricNames returned from SearchMetricNames; got %d; want at least %d", len(metricNames), int(metricsPerAdd))
}
var mn MetricName
for i, metricName := range metricNames {
if err := mn.UnmarshalString(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName=%q: %w", metricName, err)
}
addID := mn.GetTagValue("add_id")
if string(addID) != "0" {
return fmt.Errorf("unexpected addID for metricName #%d; got %q; want %q", i, addID, "0")
}
job := mn.GetTagValue("job")
if string(job) != "webservice" {
return fmt.Errorf("unexpected job for metricName #%d; got %q; want %q", i, job, "webservice")
}
}
return nil
}
func TestStorageAddRowsSerial(t *testing.T) {
rng := rand.New(rand.NewSource(1))
path := "TestStorageAddRowsSerial"
retention := 10 * retention31Days
s := MustOpenStorage(path, retention, 1e5, 1e5)
if err := testStorageAddRows(rng, s); err != nil {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestStorageAddRowsConcurrent(t *testing.T) {
path := "TestStorageAddRowsConcurrent"
retention := 10 * retention31Days
s := MustOpenStorage(path, retention, 1e5, 1e5)
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func(n int) {
rLocal := rand.New(rand.NewSource(int64(n)))
ch <- testStorageAddRows(rLocal, s)
}(i)
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
2019-05-22 21:16:55 +00:00
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
2019-05-22 21:16:55 +00:00
}
}
2019-05-22 21:16:55 +00:00
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testGenerateMetricRows(rng *rand.Rand, rows uint64, timestampMin, timestampMax int64) []MetricRow {
lib/storage: properly handle maxMetrics limit at metricID search `TL;DR` This PR improves the metric IDs search in IndexDB: - Avoid seaching for metric IDs twice when `maxMetrics` limit is exceeded - Use correct error type for indicating that the `maxMetrics` limit is exceded - Simplify the logic of deciding between per-day and global index search A unit test has been added to ensure that this refactoring does not break anything. --- Function calls before the fix: ``` idb.searchMetricIDs |__ is.searchMetricIDs |__ is.searchMetricIDsInternal |__ is.updateMetricIDsForTagFilters |__ is.tryUpdatingMetricIDsForDateRange | | |__ is.getMetricIDsForDateAndFilters ``` - `searchMetricIDsInternal` searches metric IDs for each filter set. It maintains a metric ID set variable which is updated every time the `updateMetricIDsForTagFilters` function is called. After each successful call, the function checks the length of the updated metric ID set and if it is greater than `maxMetrics`, the function returns `too many timeseries` error. - `updateMetricIDsForTagFilters` uses either per-day or global index to search metric IDs for the given filter set. The decision of which index to use is made is made within the `tryUpdatingMetricIDsForDateRange` function and if it returns `fallback to global search` error then the function uses global index by calling `getMetricIDsForDateAndFilters` with zero date. - `tryUpdatingMetricIDsForDateRange` first checks if the given time range is larger than 40 days and if so returns `fallback to global search` error. Otherwise it proceeds to searching for metric IDs within that time range by calling `getMetricIDsForDateAndFilters` for each date. - `getMetricIDsForDateAndFilters` searches for metric IDs for the given date and returns `fallback to global search` error if the number of found metric IDs is greater than `maxMetrics`. Problems with this solution: 1. The `fallback to global search` error returned by `getMetricIDsForDateAndFilters` in case when maxMetrics is exceeded is misleading. 2. If `tryUpdatingMetricIDsForDateRange` proceeds to date range search and returns `fallback to global search` error (because `getMetricIDsForDateAndFilters` returns it) then this will trigger global search in `updateMetricIDsForTagFilters`. However the global search uses the same maxMetrics value which means this search is destined to fail too. I.e. the same search is performed twice and fails twice. 3. `too many timeseries` error is already handled in `searchMetricIDsInternal` and therefore handing this error in `updateMetricIDsForTagFilters` is redundant 4. updateMetricIDsForTagFilters is a better place to make a decision on whether to use per-day or global index. Solution: 1. Use a dedicated error for `too many timeseries` case 2. Handle `too many timeseries` error in `searchMetricIDsInternal` only 3. Move the per-day or global search decision from `tryUpdatingMetricIDsForDateRange` to `updateMetricIDsForTagFilters` and remove `fallback to global search` error. --------- Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
2024-08-27 19:39:03 +00:00
return testGenerateMetricRowsWithPrefix(rng, rows, "metric", TimeRange{timestampMin, timestampMax})
}
func testGenerateMetricRowsWithPrefix(rng *rand.Rand, rows uint64, prefix string, tr TimeRange) []MetricRow {
lib/index: reduce read/write load after indexDB rotation (#2177) * lib/index: reduce read/write load after indexDB rotation IndexDB in VM is responsible for storing TSID - ID's used for identifying time series. The index is stored on disk and used by both ingestion and read path. IndexDB is stored separately to data parts and is global for all stored data. It can't be deleted partially as VM deletes data parts. Instead, indexDB is rotated once in `retention` interval. The rotation procedure means that `current` indexDB becomes `previous`, and new freshly created indexDB struct becomes `current`. So in any time, VM holds indexDB for current and previous retention periods. When time series is ingested or queried, VM checks if its TSID is present in `current` indexDB. If it is missing, it checks the `previous` indexDB. If TSID was found, it gets copied to the `current` indexDB. In this way `current` indexDB stores only series which were active during the retention period. To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both write and read path consult `tsidCache` and on miss the relad lookup happens. When rotation happens, VM resets the `tsidCache`. This is needed for ingestion path to trigger `current` indexDB re-population. Since index re-population requires additional resources, every index rotation event may cause some extra load on CPU and disk. While it may be unnoticeable for most of the cases, for systems with very high number of unique series each rotation may lead to performance degradation for some period of time. This PR makes an attempt to smooth out resource usage after the rotation. The changes are following: 1. `tsidCache` is no longer reset after the rotation; 2. Instead, each entry in `tsidCache` gains a notion of indexDB to which they belong; 3. On ingestion path after the rotation we check if requested TSID was found in `tsidCache`. Then we have 3 branches: 3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID. 3.2 Slow path. It wasn't found, so we generate it from scratch, add to `current` indexDB, add it to `tsidCache`. 3.3 Smooth path. It was found but does not belong to the `current` indexDB. In this case, we add it to the `current` indexDB with some probability. The probability is based on time passed since the last rotation with some threshold. The more time has passed since rotation the higher is chance to re-populate `current` indexDB. The default re-population interval in this PR is set to `1h`, during which entries from `previous` index supposed to slowly re-populate `current` index. The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs were moved from `previous` indexDB to the `current` indexDB. This metric supposed to grow only during the first `1h` after the last rotation. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 Signed-off-by: hagen1778 <roman@victoriametrics.com> * wip * wip Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-11 22:30:08 +00:00
var mrs []MetricRow
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
}
for i := 0; i < int(rows); i++ {
lib/storage: properly handle maxMetrics limit at metricID search `TL;DR` This PR improves the metric IDs search in IndexDB: - Avoid seaching for metric IDs twice when `maxMetrics` limit is exceeded - Use correct error type for indicating that the `maxMetrics` limit is exceded - Simplify the logic of deciding between per-day and global index search A unit test has been added to ensure that this refactoring does not break anything. --- Function calls before the fix: ``` idb.searchMetricIDs |__ is.searchMetricIDs |__ is.searchMetricIDsInternal |__ is.updateMetricIDsForTagFilters |__ is.tryUpdatingMetricIDsForDateRange | | |__ is.getMetricIDsForDateAndFilters ``` - `searchMetricIDsInternal` searches metric IDs for each filter set. It maintains a metric ID set variable which is updated every time the `updateMetricIDsForTagFilters` function is called. After each successful call, the function checks the length of the updated metric ID set and if it is greater than `maxMetrics`, the function returns `too many timeseries` error. - `updateMetricIDsForTagFilters` uses either per-day or global index to search metric IDs for the given filter set. The decision of which index to use is made is made within the `tryUpdatingMetricIDsForDateRange` function and if it returns `fallback to global search` error then the function uses global index by calling `getMetricIDsForDateAndFilters` with zero date. - `tryUpdatingMetricIDsForDateRange` first checks if the given time range is larger than 40 days and if so returns `fallback to global search` error. Otherwise it proceeds to searching for metric IDs within that time range by calling `getMetricIDsForDateAndFilters` for each date. - `getMetricIDsForDateAndFilters` searches for metric IDs for the given date and returns `fallback to global search` error if the number of found metric IDs is greater than `maxMetrics`. Problems with this solution: 1. The `fallback to global search` error returned by `getMetricIDsForDateAndFilters` in case when maxMetrics is exceeded is misleading. 2. If `tryUpdatingMetricIDsForDateRange` proceeds to date range search and returns `fallback to global search` error (because `getMetricIDsForDateAndFilters` returns it) then this will trigger global search in `updateMetricIDsForTagFilters`. However the global search uses the same maxMetrics value which means this search is destined to fail too. I.e. the same search is performed twice and fails twice. 3. `too many timeseries` error is already handled in `searchMetricIDsInternal` and therefore handing this error in `updateMetricIDsForTagFilters` is redundant 4. updateMetricIDsForTagFilters is a better place to make a decision on whether to use per-day or global index. Solution: 1. Use a dedicated error for `too many timeseries` case 2. Handle `too many timeseries` error in `searchMetricIDsInternal` only 3. Move the per-day or global search decision from `tryUpdatingMetricIDsForDateRange` to `updateMetricIDsForTagFilters` and remove `fallback to global search` error. --------- Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
2024-08-27 19:39:03 +00:00
mn.MetricGroup = []byte(fmt.Sprintf("%s_%d", prefix, i))
lib/index: reduce read/write load after indexDB rotation (#2177) * lib/index: reduce read/write load after indexDB rotation IndexDB in VM is responsible for storing TSID - ID's used for identifying time series. The index is stored on disk and used by both ingestion and read path. IndexDB is stored separately to data parts and is global for all stored data. It can't be deleted partially as VM deletes data parts. Instead, indexDB is rotated once in `retention` interval. The rotation procedure means that `current` indexDB becomes `previous`, and new freshly created indexDB struct becomes `current`. So in any time, VM holds indexDB for current and previous retention periods. When time series is ingested or queried, VM checks if its TSID is present in `current` indexDB. If it is missing, it checks the `previous` indexDB. If TSID was found, it gets copied to the `current` indexDB. In this way `current` indexDB stores only series which were active during the retention period. To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both write and read path consult `tsidCache` and on miss the relad lookup happens. When rotation happens, VM resets the `tsidCache`. This is needed for ingestion path to trigger `current` indexDB re-population. Since index re-population requires additional resources, every index rotation event may cause some extra load on CPU and disk. While it may be unnoticeable for most of the cases, for systems with very high number of unique series each rotation may lead to performance degradation for some period of time. This PR makes an attempt to smooth out resource usage after the rotation. The changes are following: 1. `tsidCache` is no longer reset after the rotation; 2. Instead, each entry in `tsidCache` gains a notion of indexDB to which they belong; 3. On ingestion path after the rotation we check if requested TSID was found in `tsidCache`. Then we have 3 branches: 3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID. 3.2 Slow path. It wasn't found, so we generate it from scratch, add to `current` indexDB, add it to `tsidCache`. 3.3 Smooth path. It was found but does not belong to the `current` indexDB. In this case, we add it to the `current` indexDB with some probability. The probability is based on time passed since the last rotation with some threshold. The more time has passed since rotation the higher is chance to re-populate `current` indexDB. The default re-population interval in this PR is set to `1h`, during which entries from `previous` index supposed to slowly re-populate `current` index. The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs were moved from `previous` indexDB to the `current` indexDB. This metric supposed to grow only during the first `1h` after the last rotation. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 Signed-off-by: hagen1778 <roman@victoriametrics.com> * wip * wip Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-11 22:30:08 +00:00
metricNameRaw := mn.marshalRaw(nil)
lib/storage: properly handle maxMetrics limit at metricID search `TL;DR` This PR improves the metric IDs search in IndexDB: - Avoid seaching for metric IDs twice when `maxMetrics` limit is exceeded - Use correct error type for indicating that the `maxMetrics` limit is exceded - Simplify the logic of deciding between per-day and global index search A unit test has been added to ensure that this refactoring does not break anything. --- Function calls before the fix: ``` idb.searchMetricIDs |__ is.searchMetricIDs |__ is.searchMetricIDsInternal |__ is.updateMetricIDsForTagFilters |__ is.tryUpdatingMetricIDsForDateRange | | |__ is.getMetricIDsForDateAndFilters ``` - `searchMetricIDsInternal` searches metric IDs for each filter set. It maintains a metric ID set variable which is updated every time the `updateMetricIDsForTagFilters` function is called. After each successful call, the function checks the length of the updated metric ID set and if it is greater than `maxMetrics`, the function returns `too many timeseries` error. - `updateMetricIDsForTagFilters` uses either per-day or global index to search metric IDs for the given filter set. The decision of which index to use is made is made within the `tryUpdatingMetricIDsForDateRange` function and if it returns `fallback to global search` error then the function uses global index by calling `getMetricIDsForDateAndFilters` with zero date. - `tryUpdatingMetricIDsForDateRange` first checks if the given time range is larger than 40 days and if so returns `fallback to global search` error. Otherwise it proceeds to searching for metric IDs within that time range by calling `getMetricIDsForDateAndFilters` for each date. - `getMetricIDsForDateAndFilters` searches for metric IDs for the given date and returns `fallback to global search` error if the number of found metric IDs is greater than `maxMetrics`. Problems with this solution: 1. The `fallback to global search` error returned by `getMetricIDsForDateAndFilters` in case when maxMetrics is exceeded is misleading. 2. If `tryUpdatingMetricIDsForDateRange` proceeds to date range search and returns `fallback to global search` error (because `getMetricIDsForDateAndFilters` returns it) then this will trigger global search in `updateMetricIDsForTagFilters`. However the global search uses the same maxMetrics value which means this search is destined to fail too. I.e. the same search is performed twice and fails twice. 3. `too many timeseries` error is already handled in `searchMetricIDsInternal` and therefore handing this error in `updateMetricIDsForTagFilters` is redundant 4. updateMetricIDsForTagFilters is a better place to make a decision on whether to use per-day or global index. Solution: 1. Use a dedicated error for `too many timeseries` case 2. Handle `too many timeseries` error in `searchMetricIDsInternal` only 3. Move the per-day or global search decision from `tryUpdatingMetricIDsForDateRange` to `updateMetricIDsForTagFilters` and remove `fallback to global search` error. --------- Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
2024-08-27 19:39:03 +00:00
timestamp := rng.Int63n(tr.MaxTimestamp-tr.MinTimestamp) + tr.MinTimestamp
value := rng.NormFloat64() * 1e6
lib/index: reduce read/write load after indexDB rotation (#2177) * lib/index: reduce read/write load after indexDB rotation IndexDB in VM is responsible for storing TSID - ID's used for identifying time series. The index is stored on disk and used by both ingestion and read path. IndexDB is stored separately to data parts and is global for all stored data. It can't be deleted partially as VM deletes data parts. Instead, indexDB is rotated once in `retention` interval. The rotation procedure means that `current` indexDB becomes `previous`, and new freshly created indexDB struct becomes `current`. So in any time, VM holds indexDB for current and previous retention periods. When time series is ingested or queried, VM checks if its TSID is present in `current` indexDB. If it is missing, it checks the `previous` indexDB. If TSID was found, it gets copied to the `current` indexDB. In this way `current` indexDB stores only series which were active during the retention period. To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both write and read path consult `tsidCache` and on miss the relad lookup happens. When rotation happens, VM resets the `tsidCache`. This is needed for ingestion path to trigger `current` indexDB re-population. Since index re-population requires additional resources, every index rotation event may cause some extra load on CPU and disk. While it may be unnoticeable for most of the cases, for systems with very high number of unique series each rotation may lead to performance degradation for some period of time. This PR makes an attempt to smooth out resource usage after the rotation. The changes are following: 1. `tsidCache` is no longer reset after the rotation; 2. Instead, each entry in `tsidCache` gains a notion of indexDB to which they belong; 3. On ingestion path after the rotation we check if requested TSID was found in `tsidCache`. Then we have 3 branches: 3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID. 3.2 Slow path. It wasn't found, so we generate it from scratch, add to `current` indexDB, add it to `tsidCache`. 3.3 Smooth path. It was found but does not belong to the `current` indexDB. In this case, we add it to the `current` indexDB with some probability. The probability is based on time passed since the last rotation with some threshold. The more time has passed since rotation the higher is chance to re-populate `current` indexDB. The default re-population interval in this PR is set to `1h`, during which entries from `previous` index supposed to slowly re-populate `current` index. The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs were moved from `previous` indexDB to the `current` indexDB. This metric supposed to grow only during the first `1h` after the last rotation. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 Signed-off-by: hagen1778 <roman@victoriametrics.com> * wip * wip Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-11 22:30:08 +00:00
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
return mrs
}
func testStorageAddRows(rng *rand.Rand, s *Storage) error {
2019-05-22 21:16:55 +00:00
const rowsPerAdd = 1e3
const addsCount = 10
maxTimestamp := timestampFromTime(time.Now())
minTimestamp := maxTimestamp - s.retentionMsecs + 3600*1000
2019-05-22 21:16:55 +00:00
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
s.AddRows(mrs, defaultPrecisionBits)
2019-05-22 21:16:55 +00:00
}
// Verify the storage contains rows.
lib/index: reduce read/write load after indexDB rotation (#2177) * lib/index: reduce read/write load after indexDB rotation IndexDB in VM is responsible for storing TSID - ID's used for identifying time series. The index is stored on disk and used by both ingestion and read path. IndexDB is stored separately to data parts and is global for all stored data. It can't be deleted partially as VM deletes data parts. Instead, indexDB is rotated once in `retention` interval. The rotation procedure means that `current` indexDB becomes `previous`, and new freshly created indexDB struct becomes `current`. So in any time, VM holds indexDB for current and previous retention periods. When time series is ingested or queried, VM checks if its TSID is present in `current` indexDB. If it is missing, it checks the `previous` indexDB. If TSID was found, it gets copied to the `current` indexDB. In this way `current` indexDB stores only series which were active during the retention period. To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both write and read path consult `tsidCache` and on miss the relad lookup happens. When rotation happens, VM resets the `tsidCache`. This is needed for ingestion path to trigger `current` indexDB re-population. Since index re-population requires additional resources, every index rotation event may cause some extra load on CPU and disk. While it may be unnoticeable for most of the cases, for systems with very high number of unique series each rotation may lead to performance degradation for some period of time. This PR makes an attempt to smooth out resource usage after the rotation. The changes are following: 1. `tsidCache` is no longer reset after the rotation; 2. Instead, each entry in `tsidCache` gains a notion of indexDB to which they belong; 3. On ingestion path after the rotation we check if requested TSID was found in `tsidCache`. Then we have 3 branches: 3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID. 3.2 Slow path. It wasn't found, so we generate it from scratch, add to `current` indexDB, add it to `tsidCache`. 3.3 Smooth path. It was found but does not belong to the `current` indexDB. In this case, we add it to the `current` indexDB with some probability. The probability is based on time passed since the last rotation with some threshold. The more time has passed since rotation the higher is chance to re-populate `current` indexDB. The default re-population interval in this PR is set to `1h`, during which entries from `previous` index supposed to slowly re-populate `current` index. The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs were moved from `previous` indexDB to the `current` indexDB. This metric supposed to grow only during the first `1h` after the last rotation. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 Signed-off-by: hagen1778 <roman@victoriametrics.com> * wip * wip Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-11 22:30:08 +00:00
minRowsExpected := uint64(rowsPerAdd * addsCount)
2019-05-22 21:16:55 +00:00
var m Metrics
s.UpdateMetrics(&m)
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
2019-05-22 21:16:55 +00:00
}
// Try creating a snapshot from the storage.
snapshotName, err := s.CreateSnapshot()
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt.Errorf("cannot create snapshot from the storage: %w", err)
2019-05-22 21:16:55 +00:00
}
// Verify the snapshot is visible
snapshots, err := s.ListSnapshots()
if err != nil {
return fmt.Errorf("cannot list snapshots: %w", err)
2019-05-22 21:16:55 +00:00
}
if !containsString(snapshots, snapshotName) {
return fmt.Errorf("cannot find snapshot %q in %q", snapshotName, snapshots)
}
// Try opening the storage from snapshot.
snapshotPath := filepath.Join(s.path, snapshotsDirname, snapshotName)
s1 := MustOpenStorage(snapshotPath, 0, 0, 0)
2019-05-22 21:16:55 +00:00
// Verify the snapshot contains rows
var m1 Metrics
s1.UpdateMetrics(&m1)
if rowsCount := m1.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, rowsCount)
2019-05-22 21:16:55 +00:00
}
// Verify that force merge for the snapshot leaves at most a single part per partition.
// Zero parts are possible if the snapshot is created just after the partition has been created
// by concurrent goroutine, but it didn't put the data into it yet.
if err := s1.ForceMergePartitions(""); err != nil {
return fmt.Errorf("error when force merging partitions: %w", err)
}
ptws := s1.tb.GetPartitions(nil)
for _, ptw := range ptws {
pws := ptw.pt.GetParts(nil, true)
numParts := len(pws)
ptw.pt.PutParts(pws)
if numParts > 1 {
s1.tb.PutPartitions(ptws)
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want at most 1", ptw.pt.name, numParts)
}
}
s1.tb.PutPartitions(ptws)
2019-05-22 21:16:55 +00:00
s1.MustClose()
// Delete the snapshot and make sure it is no longer visible.
if err := s.DeleteSnapshot(snapshotName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
2019-05-22 21:16:55 +00:00
}
snapshots, err = s.ListSnapshots()
if err != nil {
return fmt.Errorf("cannot list snapshots: %w", err)
2019-05-22 21:16:55 +00:00
}
if containsString(snapshots, snapshotName) {
return fmt.Errorf("snapshot %q must be deleted, but is still visible in %q", snapshotName, snapshots)
}
return nil
}
func TestStorageRotateIndexDB(t *testing.T) {
path := "TestStorageRotateIndexDB"
s := MustOpenStorage(path, 0, 0, 0)
2019-05-22 21:16:55 +00:00
// Start indexDB rotater in a separate goroutine
stopCh := make(chan struct{})
rotateDoneCh := make(chan struct{})
go func() {
for {
select {
case <-stopCh:
close(rotateDoneCh)
return
default:
time.Sleep(time.Millisecond)
s.mustRotateIndexDB(time.Now())
2019-05-22 21:16:55 +00:00
}
}
}()
// Run concurrent workers that insert / select data from the storage.
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
go func(workerNum int) {
ch <- testStorageAddMetrics(s, workerNum)
}(i)
}
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(10 * time.Second):
t.Fatalf("timeout")
}
}
close(stopCh)
<-rotateDoneCh
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func testStorageAddMetrics(s *Storage, workerNum int) error {
rng := rand.New(rand.NewSource(1))
2019-05-22 21:16:55 +00:00
const rowsCount = 1e3
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte(fmt.Sprintf("webservice_%d", workerNum))},
{[]byte("instance"), []byte("1.2.3.4")},
}
for i := 0; i < rowsCount; i++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d_%d", workerNum, rng.Intn(10)))
2019-05-22 21:16:55 +00:00
metricNameRaw := mn.marshalRaw(nil)
timestamp := rng.Int63n(1e10)
value := rng.NormFloat64() * 1e6
2019-05-22 21:16:55 +00:00
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
s.AddRows([]MetricRow{mr}, defaultPrecisionBits)
2019-05-22 21:16:55 +00:00
}
// Verify the storage contains rows.
minRowsExpected := uint64(rowsCount)
var m Metrics
s.UpdateMetrics(&m)
if rowsCount := m.TableMetrics.TotalRowsCount(); rowsCount < minRowsExpected {
return fmt.Errorf("expecting at least %d rows in the table; got %d", minRowsExpected, rowsCount)
2019-05-22 21:16:55 +00:00
}
return nil
}
func TestStorageDeleteStaleSnapshots(t *testing.T) {
rng := rand.New(rand.NewSource(1))
path := "TestStorageDeleteStaleSnapshots"
retention := 10 * retention31Days
s := MustOpenStorage(path, retention, 1e5, 1e5)
const rowsPerAdd = 1e3
const addsCount = 10
maxTimestamp := timestampFromTime(time.Now())
minTimestamp := maxTimestamp - s.retentionMsecs
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rng, rowsPerAdd, minTimestamp, maxTimestamp)
s.AddRows(mrs, defaultPrecisionBits)
}
// Try creating a snapshot from the storage.
snapshotName, err := s.CreateSnapshot()
if err != nil {
t.Fatalf("cannot create snapshot from the storage: %s", err)
}
// Delete snapshots older than 1 month
if err := s.DeleteStaleSnapshots(30 * 24 * time.Hour); err != nil {
t.Fatalf("error in DeleteStaleSnapshots(1 month): %s", err)
}
snapshots, err := s.ListSnapshots()
if err != nil {
t.Fatalf("cannot list snapshots: %s", err)
}
if len(snapshots) != 1 {
t.Fatalf("expecting one snapshot; got %q", snapshots)
}
if snapshots[0] != snapshotName {
t.Fatalf("snapshot %q is missing in %q", snapshotName, snapshots)
}
// Delete the snapshot which is older than 1 nanoseconds
time.Sleep(2 * time.Nanosecond)
if err := s.DeleteStaleSnapshots(time.Nanosecond); err != nil {
t.Fatalf("cannot delete snapshot %q: %s", snapshotName, err)
}
snapshots, err = s.ListSnapshots()
if err != nil {
t.Fatalf("cannot list snapshots: %s", err)
}
if len(snapshots) != 0 {
t.Fatalf("expecting zero snapshots; got %q", snapshots)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
// testRemoveAll removes all storage data produced by a test if the test hasn't
// failed. For this to work, the storage must use t.Name() as the base dir in
// its data path.
//
// In case of failure, the data is kept for further debugging.
func testRemoveAll(t *testing.T) {
defer func() {
if !t.Failed() {
fs.MustRemoveAll(t.Name())
}
}()
}
func TestStorageRowsNotAdded(t *testing.T) {
defer testRemoveAll(t)
type options struct {
name string
retention time.Duration
mrs []MetricRow
tr TimeRange
wantMetrics *Metrics
}
f := func(opts *options) {
t.Helper()
var gotMetrics Metrics
path := fmt.Sprintf("%s/%s", t.Name(), opts.name)
s := MustOpenStorage(path, opts.retention, 0, 0)
defer s.MustClose()
s.AddRows(opts.mrs, defaultPrecisionBits)
s.DebugFlush()
s.UpdateMetrics(&gotMetrics)
got := testCountAllMetricNames(s, opts.tr)
if got != 0 {
t.Fatalf("unexpected metric name count: got %d, want 0", got)
}
if got, want := gotMetrics.RowsReceivedTotal, opts.wantMetrics.RowsReceivedTotal; got != want {
t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want)
}
if got, want := gotMetrics.RowsAddedTotal, opts.wantMetrics.RowsAddedTotal; got != want {
t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want)
}
if got, want := gotMetrics.InvalidRawMetricNames, opts.wantMetrics.InvalidRawMetricNames; got != want {
t.Fatalf("unexpected Metrics.InvalidRawMetricNames: got %d, want %d", got, want)
}
}
const numRows = 1000
var (
rng = rand.New(rand.NewSource(1))
retention time.Duration
minTimestamp int64
maxTimestamp int64
mrs []MetricRow
)
minTimestamp = -1000
maxTimestamp = -1
f(&options{
name: "NegativeTimestamps",
retention: retentionMax,
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
TooSmallTimestampRows: numRows,
},
})
retention = 48 * time.Hour
minTimestamp = time.Now().Add(-retention - time.Hour).UnixMilli()
maxTimestamp = minTimestamp + 1000
f(&options{
name: "TooSmallTimestamps",
retention: retention,
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
TooSmallTimestampRows: numRows,
},
})
retention = 48 * time.Hour
minTimestamp = time.Now().Add(7 * 24 * time.Hour).UnixMilli()
maxTimestamp = minTimestamp + 1000
f(&options{
name: "TooBigTimestamps",
retention: retention,
mrs: testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp),
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
TooBigTimestampRows: numRows,
},
})
minTimestamp = time.Now().UnixMilli()
maxTimestamp = minTimestamp + 1000
mrs = testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
for i := range numRows {
mrs[i].MetricNameRaw = []byte("garbage")
}
f(&options{
name: "InvalidMetricNameRaw",
mrs: mrs,
tr: TimeRange{minTimestamp, maxTimestamp},
wantMetrics: &Metrics{
RowsReceivedTotal: numRows,
InvalidRawMetricNames: numRows,
},
})
}
func TestStorageRowsNotAdded_SeriesLimitExceeded(t *testing.T) {
defer testRemoveAll(t)
f := func(name string, maxHourlySeries int, maxDailySeries int) {
t.Helper()
rng := rand.New(rand.NewSource(1))
numRows := uint64(1000)
minTimestamp := time.Now().UnixMilli()
maxTimestamp := minTimestamp + 1000
mrs := testGenerateMetricRows(rng, numRows, minTimestamp, maxTimestamp)
var gotMetrics Metrics
path := fmt.Sprintf("%s/%s", t.Name(), name)
s := MustOpenStorage(path, 0, maxHourlySeries, maxDailySeries)
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
s.UpdateMetrics(&gotMetrics)
if got, want := gotMetrics.RowsReceivedTotal, numRows; got != want {
t.Fatalf("unexpected Metrics.RowsReceivedTotal: got %d, want %d", got, want)
}
if got := gotMetrics.HourlySeriesLimitRowsDropped; maxHourlySeries > 0 && got <= 0 {
t.Fatalf("unexpected Metrics.HourlySeriesLimitRowsDropped: got %d, want > 0", got)
}
if got := gotMetrics.DailySeriesLimitRowsDropped; maxDailySeries > 0 && got <= 0 {
t.Fatalf("unexpected Metrics.DailySeriesLimitRowsDropped: got %d, want > 0", got)
}
want := numRows - (gotMetrics.HourlySeriesLimitRowsDropped + gotMetrics.DailySeriesLimitRowsDropped)
if got := testCountAllMetricNames(s, TimeRange{minTimestamp, maxTimestamp}); uint64(got) != want {
t.Fatalf("unexpected metric name count: %d, want %d", got, want)
}
if got := gotMetrics.RowsAddedTotal; got != want {
t.Fatalf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want)
}
}
maxHourlySeries := 1
maxDailySeries := 0 // No limit
f("HourlyLimitExceeded", maxHourlySeries, maxDailySeries)
maxHourlySeries = 0 // No limit
maxDailySeries = 1
f("DailyLimitExceeded", maxHourlySeries, maxDailySeries)
}
// testCountAllMetricNames is a test helper function that counts the names of
// all time series within the given time range.
func testCountAllMetricNames(s *Storage, tr TimeRange) int {
tfsAll := NewTagFilters()
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
}
names, err := s.SearchMetricNames(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
if err != nil {
panic(fmt.Sprintf("SearchMetricNames() failed unexpectedly: %v", err))
}
return len(names)
}
lib/storage: properly handle maxMetrics limit at metricID search `TL;DR` This PR improves the metric IDs search in IndexDB: - Avoid seaching for metric IDs twice when `maxMetrics` limit is exceeded - Use correct error type for indicating that the `maxMetrics` limit is exceded - Simplify the logic of deciding between per-day and global index search A unit test has been added to ensure that this refactoring does not break anything. --- Function calls before the fix: ``` idb.searchMetricIDs |__ is.searchMetricIDs |__ is.searchMetricIDsInternal |__ is.updateMetricIDsForTagFilters |__ is.tryUpdatingMetricIDsForDateRange | | |__ is.getMetricIDsForDateAndFilters ``` - `searchMetricIDsInternal` searches metric IDs for each filter set. It maintains a metric ID set variable which is updated every time the `updateMetricIDsForTagFilters` function is called. After each successful call, the function checks the length of the updated metric ID set and if it is greater than `maxMetrics`, the function returns `too many timeseries` error. - `updateMetricIDsForTagFilters` uses either per-day or global index to search metric IDs for the given filter set. The decision of which index to use is made is made within the `tryUpdatingMetricIDsForDateRange` function and if it returns `fallback to global search` error then the function uses global index by calling `getMetricIDsForDateAndFilters` with zero date. - `tryUpdatingMetricIDsForDateRange` first checks if the given time range is larger than 40 days and if so returns `fallback to global search` error. Otherwise it proceeds to searching for metric IDs within that time range by calling `getMetricIDsForDateAndFilters` for each date. - `getMetricIDsForDateAndFilters` searches for metric IDs for the given date and returns `fallback to global search` error if the number of found metric IDs is greater than `maxMetrics`. Problems with this solution: 1. The `fallback to global search` error returned by `getMetricIDsForDateAndFilters` in case when maxMetrics is exceeded is misleading. 2. If `tryUpdatingMetricIDsForDateRange` proceeds to date range search and returns `fallback to global search` error (because `getMetricIDsForDateAndFilters` returns it) then this will trigger global search in `updateMetricIDsForTagFilters`. However the global search uses the same maxMetrics value which means this search is destined to fail too. I.e. the same search is performed twice and fails twice. 3. `too many timeseries` error is already handled in `searchMetricIDsInternal` and therefore handing this error in `updateMetricIDsForTagFilters` is redundant 4. updateMetricIDsForTagFilters is a better place to make a decision on whether to use per-day or global index. Solution: 1. Use a dedicated error for `too many timeseries` case 2. Handle `too many timeseries` error in `searchMetricIDsInternal` only 3. Move the per-day or global search decision from `tryUpdatingMetricIDsForDateRange` to `updateMetricIDsForTagFilters` and remove `fallback to global search` error. --------- Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com> Co-authored-by: Nikolay <nik@victoriametrics.com>
2024-08-27 19:39:03 +00:00
func TestStorageSearchMetricNames_TooManyTimeseries(t *testing.T) {
defer testRemoveAll(t)
const (
numDays = 100
numRows = 10
)
rng := rand.New(rand.NewSource(1))
var (
days []TimeRange
mrs []MetricRow
)
for i := range numDays {
day := TimeRange{
MinTimestamp: time.Date(2000, 1, i+1, 0, 0, 0, 0, time.UTC).UnixMilli(),
MaxTimestamp: time.Date(2000, 1, i+1, 23, 59, 59, 999, time.UTC).UnixMilli(),
}
days = append(days, day)
prefix1 := fmt.Sprintf("metric1_%d", i)
mrs = append(mrs, testGenerateMetricRowsWithPrefix(rng, numRows, prefix1, day)...)
prefix2 := fmt.Sprintf("metric2_%d", i)
mrs = append(mrs, testGenerateMetricRowsWithPrefix(rng, numRows, prefix2, day)...)
}
type options struct {
path string
filters []string
tr TimeRange
maxMetrics int
wantErr bool
wantCount int
}
f := func(opts *options) {
t.Helper()
s := MustOpenStorage(t.Name()+"/"+opts.path, 0, 0, 0)
defer s.MustClose()
s.AddRows(mrs, defaultPrecisionBits)
s.DebugFlush()
var tfss []*TagFilters
for _, filter := range opts.filters {
filter := fmt.Sprintf("%s.*", filter)
tfs := NewTagFilters()
if err := tfs.Add(nil, []byte(filter), false, true); err != nil {
t.Fatalf("unexpected error in TagFilters.Add: %v", err)
}
tfss = append(tfss, tfs)
}
names, err := s.SearchMetricNames(nil, tfss, opts.tr, opts.maxMetrics, noDeadline)
gotErr := err != nil
if gotErr != opts.wantErr {
t.Errorf("SeachMetricNames(%v, %v, %d): unexpected error: got %v, want error to happen %v", []any{
tfss, &opts.tr, opts.maxMetrics, err, opts.wantErr}...)
}
if got := len(names); got != opts.wantCount {
t.Errorf("SeachMetricNames(%v, %v, %d): unexpected metric name count: got %d, want %d", []any{
tfss, &opts.tr, opts.maxMetrics, got, opts.wantCount}...)
}
}
// Using one filter to search metric names within one day. The maxMetrics
// param is set to match exactly the number of time series that match the
// filter within that time range. Search operation must complete
// successfully.
f(&options{
path: "OneDay/OneTagFilter/MaxMetricsNotExeeded",
filters: []string{"metric1"},
tr: days[0],
maxMetrics: numRows,
wantCount: numRows,
})
// Using one filter to search metric names within one day. The maxMetrics
// param is less than the number of time series that match the filter
// within that time range. Search operation must fail.
f(&options{
path: "OneDay/OneTagFilter/MaxMetricsExeeded",
filters: []string{"metric1"},
tr: days[0],
maxMetrics: numRows - 1,
wantErr: true,
})
// Using two filters to search metric names within one day. The maxMetrics
// param is set to match exactly the number of time series that match the
// two filters within that time range. Search operation must complete
// successfully.
f(&options{
path: "OneDay/TwoTagFilters/MaxMetricsNotExeeded",
filters: []string{"metric1", "metric2"},
tr: days[0],
maxMetrics: numRows * 2,
wantCount: numRows * 2,
})
// Using two filters to search metric names within one day. The maxMetrics
// param is less than the number of time series that match the two filters
// within that time range. Search operation must fail.
f(&options{
path: "OneDay/TwoTagFilters/MaxMetricsExeeded",
filters: []string{"metric1", "metric2"},
tr: days[0],
maxMetrics: numRows*2 - 1,
wantErr: true,
})
// Using one filter to search metric names within two days. The maxMetrics
// param is set to match exactly the number of time series that match the
// filter within that time range. Search operation must complete
// successfully.
f(&options{
path: "TwoDays/OneTagFilter/MaxMetricsNotExeeded",
filters: []string{"metric1"},
tr: TimeRange{
MinTimestamp: days[0].MinTimestamp,
MaxTimestamp: days[1].MaxTimestamp,
},
maxMetrics: numRows * 2,
wantCount: numRows * 2,
})
// Using one filter to search metric names within two days. The maxMetrics
// param is less than the number of time series that match the filter
// within that time range. Search operation must fail.
f(&options{
path: "TwoDays/OneTagFilter/MaxMetricsExeeded",
filters: []string{"metric1"},
tr: TimeRange{
MinTimestamp: days[0].MinTimestamp,
MaxTimestamp: days[1].MaxTimestamp,
},
maxMetrics: numRows*2 - 1,
wantErr: true,
})
// Using two filters to search metric names within two days. The maxMetrics
// param is set to match exactly the number of time series that match the
// two filters within that time range. Search operation must complete
// successfully.
f(&options{
path: "TwoDays/TwoTagFilters/MaxMetricsNotExeeded",
filters: []string{"metric1", "metric2"},
tr: TimeRange{
MinTimestamp: days[0].MinTimestamp,
MaxTimestamp: days[1].MaxTimestamp,
},
maxMetrics: numRows * 4,
wantCount: numRows * 4,
})
// Using two filters to search metric names within two days. The maxMetrics
// param is less than the number of time series that match the two filters
// within that time range. Search operation must fail.
f(&options{
path: "TwoDays/TwoTagFilters/MaxMetricsExeeded",
filters: []string{"metric1", "metric2"},
tr: TimeRange{
MinTimestamp: days[0].MinTimestamp,
MaxTimestamp: days[1].MaxTimestamp,
},
maxMetrics: numRows*4 - 1,
wantErr: true,
})
// Using one filter to search metric names within the time range of 41 days.
// This time range corresponds to the day difference of 40 days, which is
// the max day difference when the per-day index is still used for
// searching. The maxMetrics param is set to match exactly the number of
// time series that match the filter within that time range. Search
// operation must complete successfully.
f(&options{
path: "40Days/OneTagFilter/MaxMetricsNotExeeded",
filters: []string{"metric1"},
tr: TimeRange{
MinTimestamp: days[0].MinTimestamp,
MaxTimestamp: days[40].MaxTimestamp,
},
maxMetrics: numRows * 41,
wantCount: numRows * 41,
})
// Using one filter to search metric names within the time range of 42 days.
// This time range corresponds to the day difference of 41 days, which is
// longer than than 40 days. In this case, the search is performed using
// global index instead of per-day index and the metric names will be
// searched within the entire retention period. The maxMetrics parameter,
// however, is set to the number of time series within the 42 days. The
// search must fail because the number of metrics will be much larger.
f(&options{
path: "MoreThan40Days/OneTagFilter/MaxMetricsExeeded",
filters: []string{"metric1"},
tr: TimeRange{
MinTimestamp: days[0].MinTimestamp,
MaxTimestamp: days[41].MaxTimestamp,
},
maxMetrics: numRows * 42,
wantErr: true,
})
// To fix the above case, the maxMetrics must be adjusted to be not less
// than the number of time series within the entire retention period.
f(&options{
path: "MoreThan40Days/OneTagFilter/MaxMetricsNotExeeded",
filters: []string{"metric1"},
tr: TimeRange{
MinTimestamp: days[0].MinTimestamp,
MaxTimestamp: days[41].MaxTimestamp,
},
maxMetrics: numRows * numDays,
wantCount: numRows * numDays,
})
}
// testCountAllMetricIDs is a test helper function that counts the IDs of
// all time series within the given time range.
func testCountAllMetricIDs(s *Storage, tr TimeRange) int {
tfsAll := NewTagFilters()
if err := tfsAll.Add([]byte("__name__"), []byte(".*"), false, true); err != nil {
panic(fmt.Sprintf("unexpected error in TagFilters.Add: %v", err))
}
ids, err := s.idb().searchMetricIDs(nil, []*TagFilters{tfsAll}, tr, 1e9, noDeadline)
if err != nil {
panic(fmt.Sprintf("seachMetricIDs() failed unexpectedly: %s", err))
}
return len(ids)
}
func TestStorageRegisterMetricNamesForVariousDataPatternsConcurrently(t *testing.T) {
testStorageVariousDataPatternsConcurrently(t, true, func(s *Storage, mrs []MetricRow) {
s.RegisterMetricNames(nil, mrs)
})
}
func TestStorageAddRowsForVariousDataPatternsConcurrently(t *testing.T) {
testStorageVariousDataPatternsConcurrently(t, false, func(s *Storage, mrs []MetricRow) {
s.AddRows(mrs, defaultPrecisionBits)
})
}
// testStorageVariousDataPatternsConcurrently tests different concurrency use
// cases when ingesting data of different patterns.
//
// The function is intended to be used by other tests that define which
// operation (AddRows or RegisterMetricNames) is tested.
func testStorageVariousDataPatternsConcurrently(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow)) {
defer testRemoveAll(t)
const concurrency = 4
t.Run("serial", func(t *testing.T) {
testStorageVariousDataPatterns(t, registerOnly, op, 1, false)
})
t.Run("concurrentRows", func(t *testing.T) {
testStorageVariousDataPatterns(t, registerOnly, op, concurrency, true)
})
t.Run("concurrentBatches", func(t *testing.T) {
testStorageVariousDataPatterns(t, registerOnly, op, concurrency, false)
})
}
// testStorageVariousDataPatterns tests the ingestion of different combinations
// of metric names and dates.
//
// The function is intended to be used by other tests that define the
// concurrency and the operation (AddRows or RegisterMetricNames) under test.
func testStorageVariousDataPatterns(t *testing.T, registerOnly bool, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool) {
f := func(t *testing.T, sameBatchMetricNames, sameRowMetricNames, sameBatchDates, sameRowDates bool) {
batches, wantCounts := testGenerateMetricRowBatches(&batchOptions{
numBatches: 4,
numRowsPerBatch: 100,
registerOnly: registerOnly,
sameBatchMetricNames: sameBatchMetricNames,
sameRowMetricNames: sameRowMetricNames,
sameBatchDates: sameBatchDates,
sameRowDates: sameRowDates,
})
strict := concurrency == 1
rowsAddedTotal := wantCounts.metrics.RowsAddedTotal
s := MustOpenStorage(t.Name(), 0, 0, 0)
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
assertCounts(t, s, wantCounts, strict)
// Rotate indexDB to test the case when TSIDs from tsidCache have the
// generation that is older than the generation of the current indexDB.
s.mustRotateIndexDB(time.Now())
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
wantCounts.metrics.RowsAddedTotal += rowsAddedTotal
assertCounts(t, s, wantCounts, strict)
// Empty the tsidCache to test the case when tsid is retrived from the
// index that belongs to the current generation indexDB.
s.resetAndSaveTSIDCache()
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
wantCounts.metrics.RowsAddedTotal += rowsAddedTotal
assertCounts(t, s, wantCounts, strict)
// Empty the tsidCache and rotate indexDB to test the case when tsid is
// retrived from the index that belongs to the previous generation
// indexDB.
s.resetAndSaveTSIDCache()
s.mustRotateIndexDB(time.Now())
testDoConcurrently(s, op, concurrency, splitBatches, batches)
s.DebugFlush()
wantCounts.metrics.RowsAddedTotal += rowsAddedTotal
assertCounts(t, s, wantCounts, strict)
s.MustClose()
}
t.Run("sameBatchMetrics/sameRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-01
// Batch2: metric 1971-01-01, metric 1971-01-01
t.Parallel()
f(t, true, true, true, true)
})
t.Run("sameBatchMetrics/sameRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-02
// Batch2: metric 1971-01-01, metric 1971-01-02
t.Parallel()
f(t, true, true, true, false)
})
t.Run("sameBatchMetrics/sameRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-01
// Batch2: metric 1971-01-02, metric 1971-01-02
t.Parallel()
f(t, true, true, false, true)
})
t.Run("sameBatchMetrics/sameRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric 1971-01-01, metric 1971-01-02
// Batch2: metric 1971-01-03, metric 1971-01-04
t.Parallel()
f(t, true, true, false, false)
})
t.Run("sameBatchMetrics/diffRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-01
// Batch2: metric_row0 1971-01-01, metric_row1 1971-01-01
t.Parallel()
f(t, true, false, true, true)
})
t.Run("sameBatchMetrics/diffRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-02
// Batch2: metric_row0 1971-01-01, metric_row1 1971-01-02
t.Parallel()
f(t, true, false, true, false)
})
t.Run("sameBatchMetrics/diffRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-01
// Batch2: metric_row0 1971-01-02, metric_row1 1971-01-02
t.Parallel()
f(t, true, false, false, true)
})
t.Run("sameBatchMetrics/diffRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_row0 1971-01-01, metric_row1 1971-01-02
// Batch2: metric_row0 1971-01-03, metric_row1 1971-01-04
t.Parallel()
f(t, true, false, false, false)
})
t.Run("diffBatchMetrics/sameRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-01
// Batch2: metric_batch1 1971-01-01, metric_batch1 1971-01-01
t.Parallel()
f(t, false, true, true, true)
})
t.Run("diffBatchMetrics/sameRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-02
// Batch2: metric_batch1 1971-01-01, metric_batch1 1971-01-02
t.Parallel()
f(t, false, true, true, false)
})
t.Run("diffBatchMetrics/sameRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-01
// Batch2: metric_batch1 1971-01-02, metric_batch1 1971-01-02
t.Parallel()
f(t, false, true, false, true)
})
t.Run("diffBatchMetrics/sameRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0 1971-01-01, metric_batch0 1971-01-02
// Batch2: metric_batch1 1971-01-03, metric_batch1 1971-01-04
t.Parallel()
f(t, false, true, false, false)
})
t.Run("diffBatchMetrics/diffRowMetrics/sameBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-01
// Batch2: metric_batch1_row0 1971-01-01, metric_batch1_row1 1971-01-01
t.Parallel()
f(t, false, false, true, true)
})
t.Run("diffBatchMetrics/diffRowMetrics/sameBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-02
// Batch2: metric_batch1_row0 1971-01-01, metric_batch1_row1 1971-01-02
t.Parallel()
f(t, false, false, true, false)
})
t.Run("diffBatchMetrics/diffRowMetrics/diffBatchDates/sameRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-01
// Batch2: metric_batch1_row0 1971-01-02, metric_batch1_row1 1971-01-02
t.Parallel()
f(t, false, false, false, true)
})
t.Run("diffBatchMetrics/diffRowMetrics/diffBatchDates/diffRowDates", func(t *testing.T) {
// Batch1: metric_batch0_row0 1971-01-01, metric_batch0_row1 1971-01-02
// Batch2: metric_batch1_row0 1971-01-03, metric_batch1_row1 1971-01-04
t.Parallel()
f(t, false, false, false, false)
})
}
// testDoConcurrently performs some storage operation on metric rows
// concurrently.
//
// The function accepts metric rows organized in batches. The number of
// goroutines is specified with concurrency arg. If splitBatches is false, then
// each batch is processed in a separate goroutine. Otherwise, rows from a
// single batch are spread across multiple goroutines and next batch won't be
// processed until all records of the current batch are processed.
func testDoConcurrently(s *Storage, op func(s *Storage, mrs []MetricRow), concurrency int, splitBatches bool, mrsBatches [][]MetricRow) {
if concurrency < 1 {
panic(fmt.Sprintf("Unexpected concurrency: got %d, want >= 1", concurrency))
}
var wg sync.WaitGroup
mrsCh := make(chan []MetricRow)
for range concurrency {
wg.Add(1)
go func() {
for mrs := range mrsCh {
op(s, mrs)
}
wg.Done()
}()
}
n := 1
if splitBatches {
n = concurrency
}
for _, batch := range mrsBatches {
step := len(batch) / n
if step == 0 {
step = 1
}
for begin := 0; begin < len(batch); begin += step {
limit := begin + step
if limit > len(batch) {
limit = len(batch)
}
mrsCh <- batch[begin:limit]
}
}
close(mrsCh)
wg.Wait()
}
type counts struct {
metrics *Metrics
timeRangeCounts map[TimeRange]int
dateTSDBStatuses map[uint64]*TSDBStatus
}
// assertCounts retrieves various counts from storage and compares them with
// the wanted ones.
//
// Some counts can be greater than wanted values because duplicate metric IDs
// can be created when rows are inserted concurrently. In this case `strict`
// arg can be set to false in order to replace strict equality comparison with
// `greater or equal`.
func assertCounts(t *testing.T, s *Storage, want *counts, strict bool) {
t.Helper()
var gotMetrics Metrics
s.UpdateMetrics(&gotMetrics)
if got, want := gotMetrics.RowsAddedTotal, want.metrics.RowsAddedTotal; got != want {
t.Errorf("unexpected Metrics.RowsAddedTotal: got %d, want %d", got, want)
}
gotCnt, wantCnt := gotMetrics.NewTimeseriesCreated, want.metrics.NewTimeseriesCreated
if strict {
if gotCnt != wantCnt {
t.Errorf("unexpected Metrics.NewTimeseriesCreated: got %d, want %d", gotCnt, wantCnt)
}
} else {
if gotCnt < wantCnt {
t.Errorf("unexpected Metrics.NewTimeseriesCreated: got %d, want >= %d", gotCnt, wantCnt)
}
}
for tr, want := range want.timeRangeCounts {
if got := testCountAllMetricNames(s, tr); got != want {
t.Errorf("%v: unexpected metric name count: got %d, want %d", &tr, got, want)
}
got := testCountAllMetricIDs(s, tr)
if strict {
if got != want {
t.Errorf("%v: unexpected metric ID count: got %d, want %d", &tr, got, want)
}
} else {
if got < want {
t.Errorf("%v: unexpected metric ID count: got %d, want >= %d", &tr, got, want)
}
}
}
for date, wantStatus := range want.dateTSDBStatuses {
dt := time.UnixMilli(int64(date) * msecPerDay).UTC()
gotStatus, err := s.GetTSDBStatus(nil, nil, date, "", 10, 1e6, noDeadline)
if err != nil {
t.Fatalf("GetTSDBStatus(%v) failed unexpectedly: %v", dt, err)
}
got, want := gotStatus.TotalSeries, wantStatus.TotalSeries
if strict {
if got != want {
t.Errorf("%v: unexpected TSDBStatus.TotalSeries: got %d, want %d", dt, got, want)
}
} else {
if got < want {
t.Errorf("%v: unexpected TSDBStatus.TotalSeries: got %d, want >= %d", dt, got, want)
}
}
}
}
type batchOptions struct {
numBatches int
numRowsPerBatch int
registerOnly bool
sameBatchMetricNames bool
sameRowMetricNames bool
sameBatchDates bool
sameRowDates bool
}
// testGenerateMetricRowBatches generates metric rows batches of various
// combinations of metric names and dates. The function also returns the counts
// that the storage is expected to report once the generated batch is ingested
// into the storage.
func testGenerateMetricRowBatches(opts *batchOptions) ([][]MetricRow, *counts) {
if opts.numBatches <= 0 {
panic(fmt.Sprintf("unexpected number of batches: got %d, want > 0", opts.numBatches))
}
if opts.numRowsPerBatch <= 0 {
panic(fmt.Sprintf("unexpected number of rows per batch: got %d, want > 0", opts.numRowsPerBatch))
}
rng := rand.New(rand.NewSource(1))
batches := make([][]MetricRow, opts.numBatches)
metricName := "metric"
startTime := time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(1971, 1, 1, 23, 59, 59, 999, time.UTC)
days := time.Duration(0)
trNames := make(map[TimeRange]map[string]bool)
names := make(map[string]bool)
for batch := range opts.numBatches {
batchMetricName := metricName
if !opts.sameBatchMetricNames {
batchMetricName += fmt.Sprintf("_batch%d", batch)
}
var rows []MetricRow
for row := range opts.numRowsPerBatch {
rowMetricName := batchMetricName
if !opts.sameRowMetricNames {
rowMetricName += fmt.Sprintf("_row%d", row)
}
mn := MetricName{
MetricGroup: []byte(rowMetricName),
}
tr := TimeRange{
MinTimestamp: startTime.Add(days * 24 * time.Hour).UnixMilli(),
MaxTimestamp: endTime.Add(days * 24 * time.Hour).UnixMilli(),
}
rows = append(rows, MetricRow{
MetricNameRaw: mn.marshalRaw(nil),
Timestamp: rng.Int63n(tr.MaxTimestamp-tr.MinTimestamp) + tr.MinTimestamp,
Value: rng.NormFloat64() * 1e6,
})
if !opts.sameRowDates {
days++
}
if trNames[tr] == nil {
trNames[tr] = make(map[string]bool)
}
names[rowMetricName] = true
trNames[tr][rowMetricName] = true
}
batches[batch] = rows
if opts.sameBatchDates {
days = 0
} else if opts.sameRowDates {
days++
}
}
allTimeseries := len(names)
rowsAddedTotal := uint64(opts.numBatches * opts.numRowsPerBatch)
// When RegisterMetricNames() is called it only restisters the time series
// in IndexDB but no samples is written to the storage.
if opts.registerOnly {
rowsAddedTotal = 0
}
want := counts{
metrics: &Metrics{
RowsAddedTotal: rowsAddedTotal,
NewTimeseriesCreated: uint64(allTimeseries),
},
timeRangeCounts: make(map[TimeRange]int),
dateTSDBStatuses: make(map[uint64]*TSDBStatus),
}
for tr, names := range trNames {
count := len(names)
date := uint64(tr.MinTimestamp / msecPerDay)
want.timeRangeCounts[tr] = count
want.dateTSDBStatuses[date] = &TSDBStatus{
TotalSeries: uint64(count),
}
}
return batches, &want
}