VictoriaMetrics/lib/storage/index_db_timing_test.go
Roman Khavronenko d107f86fbc
lib/index: reduce read/write load after indexDB rotation (#2177)
* lib/index: reduce read/write load after indexDB rotation

IndexDB in VM is responsible for storing TSID - ID's used for identifying
time series. The index is stored on disk and used by both ingestion and read path.

IndexDB is stored separately to data parts and is global for all stored data.
It can't be deleted partially as VM deletes data parts. Instead, indexDB is
rotated once in `retention` interval.

The rotation procedure means that `current` indexDB becomes `previous`,
and new freshly created indexDB struct becomes `current`. So in any time,
VM holds indexDB for current and previous retention periods.
When time series is ingested or queried, VM checks if its TSID is present
in `current` indexDB. If it is missing, it checks the `previous` indexDB.
If TSID was found, it gets copied to the `current` indexDB. In this way
`current` indexDB stores only series which were active during the retention
period.

To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both
write and read path consult `tsidCache` and on miss the relad lookup happens.

When rotation happens, VM resets the `tsidCache`. This is needed for ingestion
path to trigger `current` indexDB re-population. Since index re-population
requires additional resources, every index rotation event may cause some extra
load on CPU and disk. While it may be unnoticeable for most of the cases,
for systems with very high number of unique series each rotation may lead
to performance degradation for some period of time.

This PR makes an attempt to smooth out resource usage after the rotation.
The changes are following:
1. `tsidCache` is no longer reset after the rotation;
2. Instead, each entry in `tsidCache` gains a notion of indexDB to which
they belong;
3. On ingestion path after the rotation we check if requested TSID was
found in `tsidCache`. Then we have 3 branches:
3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID.
3.2 Slow path. It wasn't found, so we generate it from scratch,
add to `current` indexDB, add it to `tsidCache`.
3.3 Smooth path. It was found but does not belong to the `current` indexDB.
In this case, we add it to the `current` indexDB with some probability.
The probability is based on time passed since the last rotation with some threshold.
The more time has passed since rotation the higher is chance to re-populate `current` indexDB.
The default re-population interval in this PR is set to `1h`, during which entries from
`previous` index supposed to slowly re-populate `current` index.

The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs
were moved from `previous` indexDB to the `current` indexDB. This metric supposed to
grow only during the first `1h` after the last rotation.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* wip

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2022-02-12 00:34:44 +02:00

352 lines
10 KiB
Go

package storage
import (
"fmt"
"os"
"regexp"
"strconv"
"sync/atomic"
"testing"
"time"
)
func BenchmarkRegexpFilterMatch(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
re := regexp.MustCompile(`.*foo-bar-baz.*`)
b := []byte("fdsffd foo-bar-baz assd fdsfad dasf dsa")
for pb.Next() {
if !re.Match(b) {
panic("BUG: regexp must match!")
}
b[0]++
}
})
}
func BenchmarkRegexpFilterMismatch(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
re := regexp.MustCompile(`.*foo-bar-baz.*`)
b := []byte("fdsffd foo-bar sfddsf assd nmn,mfdsdsakj")
for pb.Next() {
if re.Match(b) {
panic("BUG: regexp mustn't match!")
}
b[0]++
}
})
}
func BenchmarkIndexDBAddTSIDs(b *testing.B) {
const recordsPerLoop = 1e3
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
b.Fatalf("cannot remove indexDB: %s", err)
}
}()
var goroutineID uint32
b.ReportAllocs()
b.SetBytes(recordsPerLoop)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var mn MetricName
var tsid TSID
mn.AccountID = atomic.AddUint32(&goroutineID, 1)
// The most common tags.
mn.Tags = []Tag{
{
Key: []byte("job"),
},
{
Key: []byte("instance"),
},
}
startOffset := 0
for pb.Next() {
benchmarkIndexDBAddTSIDs(db, &tsid, &mn, startOffset, recordsPerLoop)
startOffset += recordsPerLoop
}
})
b.StopTimer()
}
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
var metricName []byte
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsPerLoop; i++ {
mn.MetricGroup = strconv.AppendUint(mn.MetricGroup[:0], uint64(i+startOffset), 10)
for j := range mn.Tags {
mn.Tags[j].Value = strconv.AppendUint(mn.Tags[j].Value[:0], uint64(i*j), 16)
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(tsid, metricName); err != nil {
panic(fmt.Errorf("cannot insert record: %w", err))
}
}
}
func BenchmarkHeadPostingForMatchers(b *testing.B) {
// This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52
// See https://www.robustperception.io/evaluating-performance-and-correctness for more details.
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
b.Fatalf("cannot remove indexDB: %s", err)
}
}()
// Fill the db with data as in https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L66
const accountID = 34327843
const projectID = 893433
var mn MetricName
var metricName []byte
var tsid TSID
addSeries := func(kvs ...string) {
mn.Reset()
for i := 0; i < len(kvs); i += 2 {
mn.AddTag(kvs[i], kvs[i+1])
}
mn.sortTags()
mn.AccountID = accountID
mn.ProjectID = projectID
metricName = mn.Marshal(metricName[:0])
if err := db.createTSIDByName(&tsid, metricName); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
for n := 0; n < 10; n++ {
ns := strconv.Itoa(n)
for i := 0; i < 100000; i++ {
is := strconv.Itoa(i)
addSeries("i", is, "n", ns, "j", "foo")
// Have some series that won't be matched, to properly test inverted matches.
addSeries("i", is, "n", ns, "j", "bar")
addSeries("i", is, "n", "0_"+ns, "j", "bar")
addSeries("i", is, "n", "1_"+ns, "j", "bar")
addSeries("i", is, "n", "2_"+ns, "j", "foo")
}
}
// Make sure all the items can be searched.
db.tb.DebugFlush()
b.ResetTimer()
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
is := db.getIndexSearch(tfs.accountID, tfs.projectID, noDeadline)
defer db.putIndexSearch(is)
tfss := []*TagFilters{tfs}
tr := TimeRange{
MinTimestamp: 0,
MaxTimestamp: timestampFromTime(time.Now()),
}
for i := 0; i < b.N; i++ {
metricIDs, err := is.searchMetricIDs(tfss, tr, 2e9)
if err != nil {
b.Fatalf("unexpected error in searchMetricIDs: %s", err)
}
if len(metricIDs) != expectedMetricIDs {
b.Fatalf("unexpected metricIDs found; got %d; want %d", len(metricIDs), expectedMetricIDs)
}
}
}
addTagFilter := func(tfs *TagFilters, key, value string, isNegative, isRegexp bool) {
if err := tfs.Add([]byte(key), []byte(value), isNegative, isRegexp); err != nil {
b.Fatalf("cannot add tag filter %q=%q, isNegative=%v, isRegexp=%v", key, value, isNegative, isRegexp)
}
}
b.Run(`n="1"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
benchSearch(b, tfs, 2e5)
})
b.Run(`n="1",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 1e5)
})
b.Run(`j="foo",n="1"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "j", "foo", false, false)
addTagFilter(tfs, "n", "1", false, false)
benchSearch(b, tfs, 1e5)
})
b.Run(`n="1",j!="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "j", "foo", true, false)
benchSearch(b, tfs, 1e5)
})
b.Run(`i=~".*"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "i", ".*", false, true)
benchSearch(b, tfs, 0)
})
b.Run(`i=~".+"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "i", ".+", false, true)
benchSearch(b, tfs, 5e6)
})
b.Run(`i=~""`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "i", "", false, true)
benchSearch(b, tfs, 0)
})
b.Run(`i!=""`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "i", "", true, false)
benchSearch(b, tfs, 5e6)
})
b.Run(`n="1",i=~".*",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", ".*", false, true)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 1e5)
})
b.Run(`n="1",i=~".*",i!="2",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", ".*", false, true)
addTagFilter(tfs, "i", "2", true, false)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 1e5-1)
})
b.Run(`n="1",i!=""`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", "", true, false)
benchSearch(b, tfs, 2e5)
})
b.Run(`n="1",i!="",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", "", true, false)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 1e5)
})
b.Run(`n="1",i=~".+",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", ".+", false, true)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 1e5)
})
b.Run(`n="1",i=~"1.+",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", "1.+", false, true)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 11110)
})
b.Run(`n="1",i=~".+",i!="2",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", ".+", false, true)
addTagFilter(tfs, "i", "2", true, false)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 1e5-1)
})
b.Run(`n="1",i=~".+",i!~"2.*",j="foo"`, func(b *testing.B) {
tfs := NewTagFilters(accountID, projectID)
addTagFilter(tfs, "n", "1", false, false)
addTagFilter(tfs, "i", ".+", false, true)
addTagFilter(tfs, "i", "2.*", true, true)
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 88889)
})
}
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
b.Fatalf("cannot remove indexDB: %s", err)
}
}()
const recordsPerLoop = 1000
const accountsCount = 111
const projectsCount = 33333
const recordsCount = 1e5
// Fill the db with recordsCount records.
var mn MetricName
mn.MetricGroup = []byte("rps")
for i := 0; i < 2; i++ {
key := fmt.Sprintf("key_%d", i)
value := fmt.Sprintf("value_%d", i)
mn.AddTag(key, value)
}
var tsid TSID
var metricName []byte
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsCount; i++ {
mn.AccountID = uint32(i % accountsCount)
mn.ProjectID = uint32(i % projectsCount)
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
b.SetBytes(recordsPerLoop)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var tsidLocal TSID
var metricNameLocal []byte
mnLocal := mn
is := db.getIndexSearch(0, 0, noDeadline)
defer db.putIndexSearch(is)
for pb.Next() {
for i := 0; i < recordsPerLoop; i++ {
mnLocal.AccountID = uint32(i % accountsCount)
mnLocal.ProjectID = uint32(i % projectsCount)
mnLocal.sortTags()
metricNameLocal = mnLocal.Marshal(metricNameLocal[:0])
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal); err != nil {
panic(fmt.Errorf("cannot obtain tsid: %w", err))
}
}
}
})
b.StopTimer()
}