diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 51b315932..9fea8965b 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -462,16 +462,12 @@ func getStorageSearch() *storage.Search { } func putStorageSearch(sr *storage.Search) { - n := atomic.LoadUint64(&sr.MissingMetricNamesForMetricID) - missingMetricNamesForMetricID.Add(int(n)) sr.MustClose() ssPool.Put(sr) } var ssPool sync.Pool -var missingMetricNamesForMetricID = metrics.NewCounter(`vm_missing_metric_names_for_metric_id_total`) - // ProcessSearchQuery performs sq on storage nodes until the given deadline. func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) { // Setup search. diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index c1617572b..d00a0ef67 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -414,6 +414,10 @@ func registerStorageMetrics() { return float64(idbm().DateRangeSearchHits) }) + metrics.NewGauge(`vm_missing_metric_names_for_metric_id_total`, func() float64 { + return float64(idbm().MissingMetricNamesForMetricID) + }) + metrics.NewGauge(`vm_date_metric_id_cache_syncs_total`, func() float64 { return float64(m().DateMetricIDCacheSyncsCount) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index deab27d8d..e6c7c0e05 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -101,6 +101,11 @@ type indexDB struct { // The number of hits for date range searches. dateRangeSearchHits uint64 + // missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries. + // High rate may mean corrupted indexDB due to unclean shutdown. + // The db must be automatically recovered after that. + missingMetricNamesForMetricID uint64 + mustDrop uint64 // Start date fully covered by per-day inverted index. @@ -228,6 +233,8 @@ type IndexDBMetrics struct { DateRangeSearchCalls uint64 DateRangeSearchHits uint64 + MissingMetricNamesForMetricID uint64 + IndexBlocksWithMetricIDsProcessed uint64 IndexBlocksWithMetricIDsIncorrectOrder uint64 @@ -269,6 +276,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) + m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID) + m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) @@ -888,6 +897,13 @@ func (db *indexDB) searchMetricName(dst []byte, metricID uint64) ([]byte, error) // Cannot find MetricName for the given metricID. This may be the case // when indexDB contains incomplete set of metricID -> metricName entries // after a snapshot or due to unflushed entries. + atomic.AddUint64(&db.missingMetricNamesForMetricID, 1) + + // Mark the metricID as deleted, so it will be created again when new data point + // for the given time series will arrive. + if err := db.deleteMetricIDs([]uint64{metricID}); err != nil { + return dst, fmt.Errorf("cannot delete metricID for missing metricID->metricName entry; metricID=%d; error: %s", metricID, err) + } return dst, io.EOF } @@ -912,9 +928,28 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { if err != nil { return 0, err } + if err := db.deleteMetricIDs(metricIDs); err != nil { + return 0, err + } + + // Delete TSIDs in the extDB. + deletedCount := len(metricIDs) + if db.doExtDB(func(extDB *indexDB) { + var n int + n, err = extDB.DeleteTSIDs(tfss) + deletedCount += n + }) { + if err != nil { + return deletedCount, fmt.Errorf("cannot delete tsids in extDB: %s", err) + } + } + return deletedCount, nil +} + +func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { if len(metricIDs) == 0 { // Nothing to delete - return 0, nil + return nil } // Mark the found metricIDs as deleted. @@ -924,12 +959,11 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { items.B = encoding.MarshalUint64(items.B, metricID) items.Next() } - err = db.tb.AddItems(items.Items) + err := db.tb.AddItems(items.Items) putIndexItems(items) if err != nil { - return 0, err + return err } - deletedCount := len(metricIDs) // atomically add deleted metricIDs to an inmemory map. dmis := &uint64set.Set{} @@ -943,18 +977,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { // Do not reset uselessTagFiltersCache, since the found metricIDs // on cache miss are filtered out later with deletedMetricIDs. - - // Delete TSIDs in the extDB. - if db.doExtDB(func(extDB *indexDB) { - var n int - n, err = extDB.DeleteTSIDs(tfss) - deletedCount += n - }) { - if err != nil { - return deletedCount, fmt.Errorf("cannot delete tsids in extDB: %s", err) - } - } - return deletedCount, nil + return nil } func (db *indexDB) getDeletedMetricIDs() *uint64set.Set { diff --git a/lib/storage/search.go b/lib/storage/search.go index 32be4a462..51f076119 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -3,7 +3,6 @@ package storage import ( "fmt" "io" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -89,11 +88,6 @@ type Search struct { err error needClosing bool - - // MissingMetricNamesForMetricID is a counter of missing MetricID -> MetricName - // entries during the search. - // High rate may mean corrupted indexDB. - MissingMetricNamesForMetricID uint64 } func (s *Search) reset() { @@ -104,7 +98,6 @@ func (s *Search) reset() { s.ts.reset() s.err = nil s.needClosing = false - s.MissingMetricNamesForMetricID = 0 } // Init initializes s from the given storage, tfss and tr. @@ -161,8 +154,8 @@ func (s *Search) NextMetricBlock() bool { s.MetricBlock.MetricName, err = s.storage.searchMetricName(s.MetricBlock.MetricName[:0], tsid.MetricID) if err != nil { if err == io.EOF { - // Missing metricName for tsid.MetricID. Increment error counter and skip it. - atomic.AddUint64(&s.MissingMetricNamesForMetricID, 1) + // Skip missing metricName for tsid.MetricID. + // It should be automatically fixed. See indexDB.searchMetricName for details. continue } s.err = err