lib/index: reduce read/write load after indexDB rotation (#2177)

* lib/index: reduce read/write load after indexDB rotation

IndexDB in VM is responsible for storing TSID - ID's used for identifying
time series. The index is stored on disk and used by both ingestion and read path.

IndexDB is stored separately to data parts and is global for all stored data.
It can't be deleted partially as VM deletes data parts. Instead, indexDB is
rotated once in `retention` interval.

The rotation procedure means that `current` indexDB becomes `previous`,
and new freshly created indexDB struct becomes `current`. So in any time,
VM holds indexDB for current and previous retention periods.
When time series is ingested or queried, VM checks if its TSID is present
in `current` indexDB. If it is missing, it checks the `previous` indexDB.
If TSID was found, it gets copied to the `current` indexDB. In this way
`current` indexDB stores only series which were active during the retention
period.

To improve indexDB lookups, VM uses a cache layer called `tsidCache`. Both
write and read path consult `tsidCache` and on miss the relad lookup happens.

When rotation happens, VM resets the `tsidCache`. This is needed for ingestion
path to trigger `current` indexDB re-population. Since index re-population
requires additional resources, every index rotation event may cause some extra
load on CPU and disk. While it may be unnoticeable for most of the cases,
for systems with very high number of unique series each rotation may lead
to performance degradation for some period of time.

This PR makes an attempt to smooth out resource usage after the rotation.
The changes are following:
1. `tsidCache` is no longer reset after the rotation;
2. Instead, each entry in `tsidCache` gains a notion of indexDB to which
they belong;
3. On ingestion path after the rotation we check if requested TSID was
found in `tsidCache`. Then we have 3 branches:
3.1 Fast path. It was found, and belongs to the `current` indexDB. Return TSID.
3.2 Slow path. It wasn't found, so we generate it from scratch,
add to `current` indexDB, add it to `tsidCache`.
3.3 Smooth path. It was found but does not belong to the `current` indexDB.
In this case, we add it to the `current` indexDB with some probability.
The probability is based on time passed since the last rotation with some threshold.
The more time has passed since rotation the higher is chance to re-populate `current` indexDB.
The default re-population interval in this PR is set to `1h`, during which entries from
`previous` index supposed to slowly re-populate `current` index.

The new metric `vm_timeseries_repopulated_total` was added to identify how many TSIDs
were moved from `previous` indexDB to the `current` indexDB. This metric supposed to
grow only during the first `1h` after the last rotation.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* wip

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-02-12 00:30:08 +02:00 committed by GitHub
parent 08428464e9
commit cf1a8bce6b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 286 additions and 68 deletions

View file

@ -463,6 +463,9 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
return float64(idbm().NewTimeseriesCreated)
})
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
return float64(idbm().TimeseriesRepopulated)
})
metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 {
return float64(idbm().MissingTSIDsForMetricID)
})

View file

@ -14,6 +14,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: reduce CPU and disk IO usage during `indexdb` rotation once per `-retentionPeriod`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `-dropSamplesOnOverload` command-line flag for `vminsert`. If this flag is set, then `vminsert` drops incoming data if the destination `vmstorage` is temporarily unavailable or cannot keep up with the ingestion rate. The number of dropped rows can be [monitored](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) via `vm_rpc_rows_dropped_on_overload_total` metric at `vminsert`.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve re-routing logic, so it re-routes incoming data more evenly if some of `vmstorage` nodes are temporarily unavailable and/or accept data at slower rate than other `vmstorage` nodes. Also significantly reduce possible re-routing storm when `vminsert` runs with `-disableRerouting=false` command-line flag. This should help the following issues: [one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1337), [two](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1165), [three](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1054), [four](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/791), [five](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1544).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): cover more cases with the [label filters' propagation optimization](https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization). This should improve the average performance for practical queries. The following cases are additionally covered:

View file

@ -8,6 +8,7 @@ import (
"io"
"path/filepath"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
@ -59,6 +60,9 @@ type indexDB struct {
// The counter for newly created time series. It can be used for determining time series churn rate.
newTimeseriesCreated uint64
// The counter for time series which were re-populated from previous indexDB after the rotation.
timeseriesRepopulated uint64
// The number of missing MetricID -> TSID entries.
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
@ -79,6 +83,13 @@ type indexDB struct {
mustDrop uint64
// generation identifies the index generation ID
// and is used for syncing items from different indexDBs
generation uint64
// The unix timestamp in seconds for the indexDB rotation.
rotationTimestamp uint64
name string
tb *mergeset.Table
@ -98,26 +109,38 @@ type indexDB struct {
indexSearchPool sync.Pool
}
// openIndexDB opens index db from the given path with the given caches.
func openIndexDB(path string, s *Storage) (*indexDB, error) {
// openIndexDB opens index db from the given path.
//
// The last segment of the path should contain unique hex value which
// will be then used as indexDB.generation
//
// The rotationTimestamp must be set to the current unix timestamp when ipenIndexDB
// is called when creating new indexdb during indexdb rotation.
func openIndexDB(path string, s *Storage, rotationTimestamp uint64) (*indexDB, error) {
if s == nil {
logger.Panicf("BUG: Storage must be nin-nil")
}
name := filepath.Base(path)
gen, err := strconv.ParseUint(name, 16, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err)
}
tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows)
if err != nil {
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
}
name := filepath.Base(path)
// Do not persist tagFiltersCache in files, since it is very volatile.
mem := memory.Allowed()
db := &indexDB{
refCount: 1,
tb: tb,
name: name,
refCount: 1,
generation: gen,
rotationTimestamp: rotationTimestamp,
tb: tb,
name: name,
tagFiltersCache: workingsetcache.New(mem/32, time.Hour),
s: s,
@ -141,6 +164,7 @@ type IndexDBMetrics struct {
IndexDBRefCount uint64
NewTimeseriesCreated uint64
TimeseriesRepopulated uint64
MissingTSIDsForMetricID uint64
RecentHourMetricIDsSearchCalls uint64
@ -182,6 +206,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
m.TimeseriesRepopulated += atomic.LoadUint64(&db.timeseriesRepopulated)
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
@ -339,6 +364,33 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
db.s.metricNameCache.Set(key[:], metricName)
}
// maybeCreateIndexes probabilistically creates indexes for the given (tsid, metricNameRaw) at db.
//
// The probability increases from 0 to 100% during the first hour since db rotation.
//
// It returns true if new index entry was created, and false if it was skipped.
func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, error) {
h := xxhash.Sum64(metricNameRaw)
p := float64(uint32(h)) / (1 << 32)
pMin := float64(fasttime.UnixTimestamp()-db.rotationTimestamp) / 3600
if p > pMin {
// Fast path: there is no need creating indexes for metricNameRaw yet.
return false, nil
}
// Slow path: create indexes for (tsid, metricNameRaw) at db.
mn := GetMetricName()
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
}
mn.sortTags()
if err := db.createIndexes(tsid, mn); err != nil {
return false, err
}
PutMetricName(mn)
atomic.AddUint64(&db.timeseriesRepopulated, 1)
return true, nil
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
prefix := ^uint64(0)
if versioned {
@ -499,7 +551,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
}
if err := db.generateTSID(dst, metricName, mn); err != nil {
created, err := db.getOrCreateTSID(dst, metricName, mn)
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
if err := db.createIndexes(dst, mn); err != nil {
@ -508,9 +561,13 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
// There is no need in invalidating tag cache, since it is invalidated
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to OpenTable.
atomic.AddUint64(&db.newTimeseriesCreated, 1)
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
if created {
// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
atomic.AddUint64(&db.newTimeseriesCreated, 1)
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
}
}
return nil
}
@ -524,7 +581,10 @@ func SetLogNewSeries(ok bool) {
var logNewSeries = false
func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) error {
// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found.
//
// Returns true if TSID was created or false if TSID was in extDB
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {
// Search the TSID in the external storage.
// This is usually the db from the previous period.
var err error
@ -533,15 +593,19 @@ func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) er
}) {
if err == nil {
// The TSID has been found in the external storage.
return nil
return false, nil
}
if err != io.EOF {
return fmt.Errorf("external search failed: %w", err)
return false, fmt.Errorf("external search failed: %w", err)
}
}
// The TSID wasn't found in the external storage.
// Generate it locally.
generateTSID(dst, mn)
return true, nil
}
func generateTSID(dst *TSID, mn *MetricName) {
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
if len(mn.Tags) > 0 {
dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
@ -550,7 +614,6 @@ func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) er
dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
}
dst.MetricID = generateUniqueMetricID()
return nil
}
func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error {

View file

@ -9,6 +9,7 @@ import (
"reflect"
"regexp"
"sort"
"sync/atomic"
"testing"
"time"
@ -18,6 +19,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
)
func TestReverseBytes(t *testing.T) {
@ -457,15 +459,15 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) {
func TestIndexDBOpenClose(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
tableName := nextIndexDBTableName()
for i := 0; i < 5; i++ {
db, err := openIndexDB("test-index-db", s)
db, err := openIndexDB(tableName, s, 0)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
db.MustClose()
}
if err := os.RemoveAll("test-index-db"); err != nil {
if err := os.RemoveAll(tableName); err != nil {
t.Fatalf("cannot remove indexDB: %s", err)
}
}
@ -477,8 +479,8 @@ func TestIndexDB(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-serial"
db, err := openIndexDB(dbName, s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -508,7 +510,7 @@ func TestIndexDB(t *testing.T) {
// Re-open the db and verify it works as expected.
db.MustClose()
db, err = openIndexDB(dbName, s)
db, err = openIndexDB(dbName, s, 0)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -527,8 +529,8 @@ func TestIndexDB(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-concurrent"
db, err := openIndexDB(dbName, s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -1485,12 +1487,113 @@ func TestMatchTagFilters(t *testing.T) {
}
}
func TestIndexDBRepopulateAfterRotation(t *testing.T) {
path := "TestIndexRepopulateAfterRotation"
s, err := OpenStorage(path, 0, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
s.retentionMsecs = msecsPerMonth
defer func() {
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}()
db := s.idb()
if db.generation == 0 {
t.Fatalf("expected indexDB generation to be not 0")
}
const metricRowsN = 1000
// use min-max timestamps of 1month range to create smaller number of partitions
timeMin, timeMax := time.Now().Add(-730*time.Hour), time.Now()
mrs := testGenerateMetricRows(metricRowsN, timeMin.UnixMilli(), timeMax.UnixMilli())
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
s.DebugFlush()
// verify the storage contains rows.
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount < uint64(metricRowsN) {
t.Fatalf("expecting at least %d rows in the table; got %d", metricRowsN, m.TableMetrics.SmallRowsCount)
}
// check new series were registered in indexDB
added := atomic.LoadUint64(&db.newTimeseriesCreated)
if added != metricRowsN {
t.Fatalf("expected indexDB to contain %d rows; got %d", metricRowsN, added)
}
// check new series were added to cache
var cs fastcache.Stats
s.tsidCache.UpdateStats(&cs)
if cs.EntriesCount != metricRowsN {
t.Fatalf("expected tsidCache to contain %d rows; got %d", metricRowsN, cs.EntriesCount)
}
// check if cache entries do belong to current indexDB generation
var genTSID generationTSID
for _, mr := range mrs {
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
if genTSID.generation != db.generation {
t.Fatalf("expected all entries in tsidCache to have the same indexDB generation: %d;"+
"got %d", db.generation, genTSID.generation)
}
}
prevGeneration := db.generation
// force index rotation
s.mustRotateIndexDB()
// check tsidCache wasn't reset after the rotation
var cs2 fastcache.Stats
s.tsidCache.UpdateStats(&cs2)
if cs.EntriesCount != metricRowsN {
t.Fatalf("expected tsidCache after rotation to contain %d rows; got %d", metricRowsN, cs2.EntriesCount)
}
dbNew := s.idb()
if dbNew.generation == 0 {
t.Fatalf("expected new indexDB generation to be not 0")
}
if dbNew.generation == prevGeneration {
t.Fatalf("expected new indexDB generation %d to be different from prev indexDB", dbNew.generation)
}
// Re-insert rows again and verify that entries belong prevGeneration and dbNew.generation,
// while the majority of entries remain at prevGeneration.
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
s.DebugFlush()
entriesByGeneration := make(map[uint64]int)
for _, mr := range mrs {
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
entriesByGeneration[genTSID.generation]++
}
if len(entriesByGeneration) > 2 {
t.Fatalf("expecting two generations; got %d", entriesByGeneration)
}
prevEntries := entriesByGeneration[prevGeneration]
currEntries := entriesByGeneration[dbNew.generation]
totalEntries := prevEntries + currEntries
if totalEntries != metricRowsN {
t.Fatalf("unexpected number of entries in tsid cache; got %d; want %d", totalEntries, metricRowsN)
}
if float64(currEntries)/float64(totalEntries) > 0.1 {
t.Fatalf("too big share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries)
}
}
func TestSearchTSIDWithTimeRange(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-ts-range"
db, err := openIndexDB(dbName, s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}

View file

@ -43,8 +43,8 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-index-db-add-tsids"
db, err := openIndexDB(dbName, s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -104,8 +104,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-head-posting-for-matchers"
db, err := openIndexDB(dbName, s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -279,8 +279,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-index-db-get-tsids"
db, err := openIndexDB(dbName, s)
dbName := nextIndexDBTableName()
db, err := openIndexDB(dbName, s, 0)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}

View file

@ -155,8 +155,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
path: path,
cachePath: path + "/cache",
retentionMsecs: retentionMsecs,
stop: make(chan struct{}),
stop: make(chan struct{}),
}
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, fmt.Errorf("cannot create a directory for the storage at %q: %w", path, err)
@ -692,7 +691,8 @@ func (s *Storage) mustRotateIndexDB() {
// Create new indexdb table.
newTableName := nextIndexDBTableName()
idbNewPath := s.path + "/indexdb/" + newTableName
idbNew, err := openIndexDB(idbNewPath, s)
rotationTimestamp := fasttime.UnixTimestamp()
idbNew, err := openIndexDB(idbNewPath, s, rotationTimestamp)
if err != nil {
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
}
@ -711,8 +711,9 @@ func (s *Storage) mustRotateIndexDB() {
// Persist changes on the file system.
fs.MustSyncPath(s.path)
// Flush tsidCache, so idbNew can be populated with fresh data.
s.resetAndSaveTSIDCache()
// Do not flush tsidCache to avoid read/write path slowdown
// and slowly re-populate new idb with entries from the cache via maybeCreateIndexes().
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s.dateMetricIDCache.Reset()
@ -1627,17 +1628,32 @@ var (
// Th MetricRow.Value field is ignored.
func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
var (
tsid TSID
metricName []byte
)
var genTSID generationTSID
mn := GetMetricName()
defer PutMetricName(mn)
idb := s.idb()
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
for i := range mrs {
mr := &mrs[i]
if s.getTSIDFromCache(&tsid, mr.MetricNameRaw) {
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if genTSID.generation != idb.generation {
// The found entry is from the previous cache generation
// so attempt to re-populate the current generation with this entry.
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw)
if err != nil {
return fmt.Errorf("cannot create indexes in the current indexdb: %w", err)
}
if created {
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
}
// Fast path - mr.MetricNameRaw has been already registered.
continue
}
@ -1648,14 +1664,14 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil {
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName); err != nil {
return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err)
}
s.putTSIDToCache(&tsid, mr.MetricNameRaw)
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
// Register the metric in per-day inverted index.
date := uint64(mr.Timestamp) / msecPerDay
metricID := tsid.MetricID
metricID := genTSID.TSID.MetricID
if s.dateMetricIDCache.Has(date, metricID) {
// Fast path: the metric has been already registered in per-day inverted index
continue
@ -1690,6 +1706,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
)
var pmrs *pendingMetricRows
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
var genTSID generationTSID
// Return only the first error, since it has no sense in returning all errors.
var firstWarn error
for i := range mrs {
@ -1734,7 +1753,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
r.TSID = prevTSID
continue
}
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
r.TSID = genTSID.TSID
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j--
@ -1746,6 +1766,20 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
// See Storage.DeleteMetrics code for details.
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if genTSID.generation != idb.generation {
// The found entry is from the previous cache generation
// so attempt to re-populate the current generation with this entry.
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw)
if err != nil {
return fmt.Errorf("cannot create indexes in the current indexdb: %w", err)
}
if created {
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
}
continue
}
@ -1805,7 +1839,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
j--
continue
}
s.putTSIDToCache(&r.TSID, mr.MetricNameRaw)
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
@ -2342,13 +2378,20 @@ type hourMetricIDs struct {
isFull bool
}
func (s *Storage) getTSIDFromCache(dst *TSID, metricName []byte) bool {
type generationTSID struct {
TSID TSID
// generation stores the indexdb.generation value to identify to which indexdb belongs this TSID
generation uint64
}
func (s *Storage) getTSIDFromCache(dst *generationTSID, metricName []byte) bool {
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))[:]
buf = s.tsidCache.Get(buf[:0], metricName)
return uintptr(len(buf)) == unsafe.Sizeof(*dst)
}
func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) {
buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid))[:]
s.tsidCache.Set(metricName, buf)
}
@ -2412,12 +2455,12 @@ func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error
// Open the last two tables.
currPath := path + "/" + tableNames[len(tableNames)-1]
curr, err = openIndexDB(currPath, s)
curr, err = openIndexDB(currPath, s, 0)
if err != nil {
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
}
prevPath := path + "/" + tableNames[len(tableNames)-2]
prev, err = openIndexDB(prevPath, s)
prev, err = openIndexDB(prevPath, s, 0)
if err != nil {
curr.MustClose()
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)

View file

@ -918,37 +918,42 @@ func TestStorageAddRowsConcurrent(t *testing.T) {
}
}
func testGenerateMetricRows(rows uint64, timestampMin, timestampMax int64) []MetricRow {
var mrs []MetricRow
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
}
for i := 0; i < int(rows); i++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i))
metricNameRaw := mn.marshalRaw(nil)
timestamp := rand.Int63n(timestampMax-timestampMin) + timestampMin
value := rand.NormFloat64() * 1e6
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
return mrs
}
func testStorageAddRows(s *Storage) error {
const rowsPerAdd = 1e3
const addsCount = 10
for i := 0; i < addsCount; i++ {
var mrs []MetricRow
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("webservice")},
{[]byte("instance"), []byte("1.2.3.4")},
}
for j := 0; j < rowsPerAdd; j++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", rand.Intn(100)))
metricNameRaw := mn.marshalRaw(nil)
timestamp := rand.Int63n(1e10)
value := rand.NormFloat64() * 1e6
mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}
mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
return fmt.Errorf("unexpected error when adding mrs: %w", err)
}
}
// Verify the storage contains rows.
minRowsExpected := uint64(rowsPerAdd) * addsCount
minRowsExpected := uint64(rowsPerAdd * addsCount)
var m Metrics
s.UpdateMetrics(&m)
if m.TableMetrics.SmallRowsCount < minRowsExpected {