lib/storage: reset cache on disk during series deletion and during indexdb rotation

This should prevent from inconsistent behavior (aka partially missing data for some time series) after unclean shutdown.

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347
This commit is contained in:
Aliaksandr Valialkin 2021-06-11 12:42:26 +03:00
parent d979e14da2
commit c4f3fbfa5d
5 changed files with 94 additions and 112 deletions

View file

@ -7,6 +7,7 @@ sort: 15
## tip
* BUGFIX: vmalert: fix recording rules, which were broken in v1.61.0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1369).
* BUGFIX: reset the on-disk cache for mapping from the full metric name to an internal metric id (e.g. `metric_name{labels} -> internal_metric_id`) after deleting metrics via [delete API](https://docs.victoriametrics.com/#how-to-delete-time-series). This should prevent from possible inconsistent state after unclean shutdown. This [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347).
## [v1.61.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.61.0)

View file

@ -91,14 +91,8 @@ type indexDB struct {
// Cache for fast TagFilters -> TSIDs lookup.
tagCache *workingsetcache.Cache
// Cache for fast MetricID -> TSID lookup.
metricIDCache *workingsetcache.Cache
// Cache for fast MetricID -> MetricName lookup.
metricNameCache *workingsetcache.Cache
// Cache for fast MetricName -> TSID lookups.
tsidCache *workingsetcache.Cache
// The parent storage.
s *Storage
// Cache for useless TagFilters entries, which have no tag filters
// matching low number of metrics.
@ -118,21 +112,12 @@ type indexDB struct {
// metricIDs, since it usually requires 1 bit per deleted metricID.
deletedMetricIDs atomic.Value
deletedMetricIDsUpdateLock sync.Mutex
// The minimum timestamp when queries with composite index can be used.
minTimestampForCompositeIndex int64
}
// openIndexDB opens index db from the given path with the given caches.
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (*indexDB, error) {
if metricIDCache == nil {
logger.Panicf("BUG: metricIDCache must be non-nil")
}
if metricNameCache == nil {
logger.Panicf("BUG: metricNameCache must be non-nil")
}
if tsidCache == nil {
logger.Panicf("BUG: tsidCache must be nin-nil")
func openIndexDB(path string, s *Storage) (*indexDB, error) {
if s == nil {
logger.Panicf("BUG: Storage must be nin-nil")
}
tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows)
@ -151,13 +136,9 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
name: name,
tagCache: workingsetcache.New(mem/32, time.Hour),
metricIDCache: metricIDCache,
metricNameCache: metricNameCache,
tsidCache: tsidCache,
s: s,
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
minTimestampForCompositeIndex: minTimestampForCompositeIndex,
}
is := db.getIndexSearch(noDeadline)
@ -249,7 +230,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
m.MinTimestampForCompositeIndex = uint64(db.minTimestampForCompositeIndex)
m.MinTimestampForCompositeIndex = uint64(db.s.minTimestampForCompositeIndex)
m.CompositeFilterSuccessConversions = atomic.LoadUint64(&compositeFilterSuccessConversions)
m.CompositeFilterMissingConversions = atomic.LoadUint64(&compositeFilterMissingConversions)
@ -323,9 +304,7 @@ func (db *indexDB) decRef() {
db.loopsPerDateTagFilterCache.Stop()
db.tagCache = nil
db.metricIDCache = nil
db.metricNameCache = nil
db.tsidCache = nil
db.s = nil
db.uselessTagFiltersCache = nil
db.loopsPerDateTagFilterCache = nil
@ -376,7 +355,7 @@ func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
// must be checked by the caller.
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
tmp := db.metricIDCache.Get(buf[:0], key[:])
tmp := db.s.metricIDCache.Get(buf[:0], key[:])
if len(tmp) == 0 {
// The TSID for the given metricID wasn't found in the cache.
return io.EOF
@ -390,19 +369,19 @@ func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
func (db *indexDB) putToMetricIDCache(metricID uint64, tsid *TSID) {
buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid))
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
db.metricIDCache.Set(key[:], buf[:])
db.s.metricIDCache.Set(key[:], buf[:])
}
func (db *indexDB) getMetricNameFromCache(dst []byte, metricID uint64) []byte {
// There is no need in checking for deleted metricIDs here, since they
// must be checked by the caller.
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
return db.metricNameCache.Get(dst, key[:])
return db.s.metricNameCache.Get(dst, key[:])
}
func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
db.metricNameCache.Set(key[:], metricName)
db.s.metricNameCache.Set(key[:], metricName)
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
@ -1634,19 +1613,6 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
return nil
}
// Mark the found metricIDs as deleted.
items := getIndexItems()
for _, metricID := range metricIDs {
items.B = append(items.B, nsPrefixDeletedMetricID)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
err := db.tb.AddItems(items.Items)
putIndexItems(items)
if err != nil {
return err
}
// atomically add deleted metricIDs to an inmemory map.
dmis := &uint64set.Set{}
dmis.AddMulti(metricIDs)
@ -1656,11 +1622,25 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
invalidateTagCache()
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
db.tsidCache.Reset()
db.s.resetAndSaveTSIDCache()
// Do not reset uselessTagFiltersCache, since the found metricIDs
// on cache miss are filtered out later with deletedMetricIDs.
return nil
// Store the metricIDs as deleted.
// Make this after updating the deletedMetricIDs and resetting caches
// in order to exclude the possibility of the inconsistent state when the deleted metricIDs
// remain available in the tsidCache after unclean shutdown.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347
items := getIndexItems()
for _, metricID := range metricIDs {
items.B = append(items.B, nsPrefixDeletedMetricID)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
err := db.tb.AddItems(items.Items)
putIndexItems(items)
return err
}
func (db *indexDB) getDeletedMetricIDs() *uint64set.Set {
@ -1709,7 +1689,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
if len(tfss) == 0 {
return nil, nil
}
if tr.MinTimestamp >= db.minTimestampForCompositeIndex {
if tr.MinTimestamp >= db.s.minTimestampForCompositeIndex {
tfss = convertToCompositeTagFilterss(tfss)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
@ -454,15 +455,11 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) {
}
func TestIndexDBOpenClose(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
for i := 0; i < 5; i++ {
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB("test-index-db", s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -477,15 +474,11 @@ func TestIndexDB(t *testing.T) {
const metricGroups = 10
t.Run("serial", func(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-serial"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -515,7 +508,7 @@ func TestIndexDB(t *testing.T) {
// Re-open the db and verify it works as expected.
db.MustClose()
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err = openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -531,15 +524,11 @@ func TestIndexDB(t *testing.T) {
})
t.Run("concurrent", func(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-concurrent"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -1463,15 +1452,11 @@ func TestMatchTagFilters(t *testing.T) {
}
func TestSearchTSIDWithTimeRange(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-ts-range"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -1724,3 +1709,20 @@ func toTFPointers(tfs []tagFilter) []*tagFilter {
}
return tfps
}
func newTestStorage() *Storage {
return &Storage{
cachePath: "test-storage-cache",
metricIDCache: workingsetcache.New(1234, time.Hour),
metricNameCache: workingsetcache.New(1234, time.Hour),
tsidCache: workingsetcache.New(1234, time.Hour),
}
}
func stopTestStorage(s *Storage) {
s.metricIDCache.Stop()
s.metricNameCache.Stop()
s.tsidCache.Stop()
fs.MustRemoveAll(s.cachePath)
}

View file

@ -7,8 +7,6 @@ import (
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)
func BenchmarkRegexpFilterMatch(b *testing.B) {
@ -42,15 +40,11 @@ func BenchmarkRegexpFilterMismatch(b *testing.B) {
func BenchmarkIndexDBAddTSIDs(b *testing.B) {
const recordsPerLoop = 1e3
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-index-db-add-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -107,15 +101,11 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
func BenchmarkHeadPostingForMatchers(b *testing.B) {
// This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52
// See https://www.robustperception.io/evaluating-performance-and-correctness for more details.
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-head-posting-for-matchers"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -286,15 +276,11 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
}
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-index-db-get-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}

View file

@ -201,7 +201,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
}
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex)
idbCurr, idbPrev, err := s.openIndexDBTables(idbPath)
if err != nil {
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
}
@ -579,7 +579,7 @@ func (s *Storage) mustRotateIndexDB() {
// Create new indexdb table.
newTableName := nextIndexDBTableName()
idbNewPath := s.path + "/indexdb/" + newTableName
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex)
idbNew, err := openIndexDB(idbNewPath, s)
if err != nil {
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
}
@ -599,7 +599,7 @@ func (s *Storage) mustRotateIndexDB() {
fs.MustSyncPath(s.path)
// Flush tsidCache, so idbNew can be populated with fresh data.
s.tsidCache.Reset()
s.resetAndSaveTSIDCache()
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s.dateMetricIDCache.Reset()
@ -610,6 +610,11 @@ func (s *Storage) mustRotateIndexDB() {
// There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day.
}
func (s *Storage) resetAndSaveTSIDCache() {
s.tsidCache.Reset()
s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
}
// MustClose closes the storage.
//
// It is expected that the s is no longer used during the close.
@ -624,9 +629,12 @@ func (s *Storage) MustClose() {
s.idb().MustClose()
// Save caches.
s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
s.tsidCache.Stop()
s.mustSaveCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
s.metricIDCache.Stop()
s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.metricNameCache.Stop()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
@ -853,7 +861,10 @@ func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcac
return c
}
func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name string) {
func (s *Storage) mustSaveCache(c *workingsetcache.Cache, info, name string) {
saveCacheLock.Lock()
defer saveCacheLock.Unlock()
path := s.cachePath + "/" + name
logger.Infof("saving %s cache to %q...", info, path)
startTime := time.Now()
@ -862,11 +873,13 @@ func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name stri
}
var cs fastcache.Stats
c.UpdateStats(&cs)
c.Stop()
logger.Infof("saved %s cache to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d",
info, path, time.Since(startTime).Seconds(), cs.EntriesCount, cs.BytesSize)
}
// saveCacheLock prevents from data races when multiple concurrent goroutines save the same cache.
var saveCacheLock sync.Mutex
func nextRetentionDuration(retentionMsecs int64) time.Duration {
// Round retentionMsecs to days. This guarantees that per-day inverted index works as expected.
retentionMsecs = ((retentionMsecs + msecPerDay - 1) / msecPerDay) * msecPerDay
@ -2158,7 +2171,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
s.tsidCache.Set(metricName, buf)
}
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (curr, prev *indexDB, err error) {
func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
}
@ -2217,12 +2230,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w
// Open the last two tables.
currPath := path + "/" + tableNames[len(tableNames)-1]
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex)
curr, err = openIndexDB(currPath, s)
if err != nil {
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
}
prevPath := path + "/" + tableNames[len(tableNames)-2]
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex)
prev, err = openIndexDB(prevPath, s)
if err != nil {
curr.MustClose()
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)