diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 1fff4a366..0be6a04ea 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -359,6 +359,10 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(idbm().DateRangeSearchHits) }) + metrics.NewGauge(`vm_missing_metric_names_for_metric_id_total`, func() float64 { + return float64(idbm().MissingMetricNamesForMetricID) + }) + metrics.NewGauge(`vm_date_metric_id_cache_syncs_total`, func() float64 { return float64(m().DateMetricIDCacheSyncsCount) }) diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index b09e07d0c..3b4d194bb 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -330,10 +330,7 @@ func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error { sizeBuf: make([]byte, 8), } for { - err := s.processVMSelectRequest(ctx) - n := atomic.LoadUint64(&ctx.sr.MissingMetricNamesForMetricID) - missingMetricNamesForMetricID.Add(int(n)) - if err != nil { + if err := s.processVMSelectRequest(ctx); err != nil { if err == io.EOF { // Remote client gracefully closed the connection. return nil @@ -346,8 +343,6 @@ func (s *Server) processVMSelectConn(bc *handshake.BufferedConn) error { } } -var missingMetricNamesForMetricID = metrics.NewCounter(`vm_missing_metric_names_for_metric_id_total`) - type vmselectRequestCtx struct { bc *handshake.BufferedConn sizeBuf []byte diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index cbab45e01..a80b30b5d 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -101,6 +101,11 @@ type indexDB struct { // The number of hits for date range searches. dateRangeSearchHits uint64 + // missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries. + // High rate may mean corrupted indexDB due to unclean shutdown. + // The db must be automatically recovered after that. + missingMetricNamesForMetricID uint64 + mustDrop uint64 // Start date fully covered by per-day inverted index. @@ -228,6 +233,8 @@ type IndexDBMetrics struct { DateRangeSearchCalls uint64 DateRangeSearchHits uint64 + MissingMetricNamesForMetricID uint64 + IndexBlocksWithMetricIDsProcessed uint64 IndexBlocksWithMetricIDsIncorrectOrder uint64 @@ -269,6 +276,8 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) + m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID) + m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) @@ -903,6 +912,13 @@ func (db *indexDB) searchMetricName(dst []byte, metricID uint64, accountID, proj // Cannot find MetricName for the given metricID. This may be the case // when indexDB contains incomplete set of metricID -> metricName entries // after a snapshot or due to unflushed entries. + atomic.AddUint64(&db.missingMetricNamesForMetricID, 1) + + // Mark the metricID as deleted, so it will be created again when new data point + // for the given time series will arrive. + if err := db.deleteMetricIDs([]uint64{metricID}); err != nil { + return dst, fmt.Errorf("cannot delete metricID for missing metricID->metricName entry; metricID=%d; error: %s", metricID, err) + } return dst, io.EOF } @@ -927,9 +943,28 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { if err != nil { return 0, err } + if err := db.deleteMetricIDs(metricIDs); err != nil { + return 0, err + } + + // Delete TSIDs in the extDB. + deletedCount := len(metricIDs) + if db.doExtDB(func(extDB *indexDB) { + var n int + n, err = extDB.DeleteTSIDs(tfss) + deletedCount += n + }) { + if err != nil { + return deletedCount, fmt.Errorf("cannot delete tsids in extDB: %s", err) + } + } + return deletedCount, nil +} + +func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { if len(metricIDs) == 0 { // Nothing to delete - return 0, nil + return nil } // Mark the found metricIDs as deleted. @@ -939,12 +974,11 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { items.B = encoding.MarshalUint64(items.B, metricID) items.Next() } - err = db.tb.AddItems(items.Items) + err := db.tb.AddItems(items.Items) putIndexItems(items) if err != nil { - return 0, err + return err } - deletedCount := len(metricIDs) // atomically add deleted metricIDs to an inmemory map. dmis := &uint64set.Set{} @@ -958,18 +992,7 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { // Do not reset uselessTagFiltersCache, since the found metricIDs // on cache miss are filtered out later with deletedMetricIDs. - - // Delete TSIDs in the extDB. - if db.doExtDB(func(extDB *indexDB) { - var n int - n, err = extDB.DeleteTSIDs(tfss) - deletedCount += n - }) { - if err != nil { - return deletedCount, fmt.Errorf("cannot delete tsids in extDB: %s", err) - } - } - return deletedCount, nil + return nil } func (db *indexDB) getDeletedMetricIDs() *uint64set.Set { diff --git a/lib/storage/search.go b/lib/storage/search.go index 0433c8aa3..a2cfa4eb4 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -3,7 +3,6 @@ package storage import ( "fmt" "io" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -89,11 +88,6 @@ type Search struct { err error needClosing bool - - // MissingMetricNamesForMetricID is a counter of missing MetricID -> MetricName - // entries during the search. - // High rate may mean corrupted indexDB. - MissingMetricNamesForMetricID uint64 } func (s *Search) reset() { @@ -104,7 +98,6 @@ func (s *Search) reset() { s.ts.reset() s.err = nil s.needClosing = false - s.MissingMetricNamesForMetricID = 0 } // Init initializes s from the given storage, tfss and tr. @@ -161,8 +154,8 @@ func (s *Search) NextMetricBlock() bool { s.MetricBlock.MetricName, err = s.storage.searchMetricName(s.MetricBlock.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID) if err != nil { if err == io.EOF { - // Missing metricName for tsid.MetricID. Increment error counter and skip it. - atomic.AddUint64(&s.MissingMetricNamesForMetricID, 1) + // Skip missing metricName for tsid.MetricID. + // It should be automatically fixed. See indexDB.searchMetricName for details. continue } s.err = err