From c3b1d9ee2171ceba61c79d275661890a7418d9fb Mon Sep 17 00:00:00 2001 From: Roman Khavronenko <roman@victoriametrics.com> Date: Wed, 17 May 2023 00:46:42 +0200 Subject: [PATCH] lib/storage: introduce per-day MetricName=>TSID index (#4252) The new index substitutes global MetricName=>TSID index used for locating TSIDs on ingestion path. For installations with high ingestion and churn rate, global MetricName=>TSID index can grow enormously making index lookups too expensive. This also results into bigger than expected cache growth for indexdb blocks. New per-day index supposed to be much smaller and more efficient. This should improve ingestion speed and reliability during re-routings in cluster. The negative outcome could be occupied disk size, since per-day index is more expensive comparing to global index. Signed-off-by: hagen1778 <roman@victoriametrics.com> --- lib/storage/index_db.go | 54 +++++++++++++++++++++--------------- lib/storage/index_db_test.go | 13 +++++---- lib/storage/storage.go | 42 ++++++++++++---------------- 3 files changed, 57 insertions(+), 52 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 3cd1f7001..00f855429 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -31,7 +31,11 @@ import ( const ( // Prefix for MetricName->TSID entries. - nsPrefixMetricNameToTSID = 0 + // This index was substituted with nsPrefixDateMetricNameToTSID + // for resource efficiency sake. The intention is to speed up + // index lookups on busy installations. Looking up in per-day + // index supposed to be much more efficient comparing to global index. + // nsPrefixMetricNameToTSID = 0 // Prefix for Tag->MetricID entries. nsPrefixTagToMetricIDs = 1 @@ -50,6 +54,9 @@ const ( // Prefix for (Date,Tag)->MetricID entries. nsPrefixDateTagToMetricIDs = 6 + + // Prefix for (Date,MetricName)->TSID entries. + nsPrefixDateMetricNameToTSID = 7 ) // indexDB represents an index db. @@ -406,7 +413,7 @@ func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date } mn.sortTags() is.createGlobalIndexes(tsid, mn) - is.createPerDayIndexes(date, tsid.MetricID, mn) + is.createPerDayIndexes(date, tsid, mn) PutMetricName(mn) atomic.AddUint64(&is.db.timeseriesRepopulated, 1) return true, nil @@ -489,9 +496,9 @@ func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) { // getTSIDByNameNoCreate fills the dst with TSID for the given metricName. // // It returns io.EOF if the given mn isn't found locally. -func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error { +func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte, date uint64) error { is := db.getIndexSearch(0, 0, noDeadline) - err := is.getTSIDByMetricName(dst, metricName) + err := is.getTSIDByMetricName(dst, metricName, date) db.putIndexSearch(is) if err == nil { return nil @@ -534,7 +541,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRa // This should improve insertion performance for big batches // of new time series. if is.tsidByNameMisses < 100 { - err := is.getTSIDByMetricName(dst, metricName) + err := is.getTSIDByMetricName(dst, metricName, date) if err == nil { // Fast path - the TSID for the given metricName has been found in the index. is.tsidByNameMisses = 0 @@ -605,7 +612,7 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err) } - created, err := is.db.getOrCreateTSID(dst, metricName, mn) + created, err := is.db.getOrCreateTSID(dst, metricName, mn, date) if err != nil { return fmt.Errorf("cannot generate TSID: %w", err) } @@ -613,7 +620,7 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b return err } is.createGlobalIndexes(dst, mn) - is.createPerDayIndexes(date, dst.MetricID, mn) + is.createPerDayIndexes(date, dst, mn) // There is no need in invalidating tag cache, since it is invalidated // on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable. @@ -640,12 +647,12 @@ var logNewSeries = false // getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found. // // Returns true if TSID was created or false if TSID was in extDB -func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) { +func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName, date uint64) (bool, error) { // Search the TSID in the external storage. // This is usually the db from the previous period. var err error if db.doExtDB(func(extDB *indexDB) { - err = extDB.getTSIDByNameNoCreate(dst, metricName) + err = extDB.getTSIDByNameNoCreate(dst, metricName, date) }) { if err == nil { // The TSID has been found in the external storage. @@ -686,13 +693,6 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) { ii := getIndexItems() defer putIndexItems(ii) - // Create MetricName -> TSID index. - ii.B = append(ii.B, nsPrefixMetricNameToTSID) - ii.B = mn.Marshal(ii.B) - ii.B = append(ii.B, kvSeparatorChar) - ii.B = tsid.Marshal(ii.B) - ii.Next() - // Create MetricID -> MetricName index. ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID) ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID) @@ -2089,11 +2089,12 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, accountID, proj var tagFiltersKeyBufPool bytesutil.ByteBufferPool -func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error { +func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte, date uint64) error { dmis := is.db.s.getDeletedMetricIDs() ts := &is.ts kb := &is.kb - kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID) + kb.B = append(kb.B[:0], nsPrefixDateMetricNameToTSID) + kb.B = encoding.MarshalUint64(kb.B, date) kb.B = append(kb.B, metricName...) kb.B = append(kb.B, kvSeparatorChar) ts.Seek(kb.B) @@ -2975,13 +2976,22 @@ const ( int64Max = int64((1 << 63) - 1) ) -func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) { +func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricName) { ii := getIndexItems() defer putIndexItems(ii) ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID, mn.AccountID, mn.ProjectID) ii.B = encoding.MarshalUint64(ii.B, date) - ii.B = encoding.MarshalUint64(ii.B, metricID) + ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID) + ii.Next() + + // Create per-day inverted index entries for TSID. + // do not use marshalCommonPrefix() here, since the mn itself contains (AccountID, ProjectID) prefix. + ii.B = append(ii.B, nsPrefixDateMetricNameToTSID) + ii.B = encoding.MarshalUint64(ii.B, date) + ii.B = mn.Marshal(ii.B) + ii.B = append(ii.B, kvSeparatorChar) + ii.B = tsid.Marshal(ii.B) ii.Next() // Create per-day inverted index entries for metricID. @@ -2989,9 +2999,9 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName defer kbPool.Put(kb) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, mn.AccountID, mn.ProjectID) kb.B = encoding.MarshalUint64(kb.B, date) - ii.registerTagIndexes(kb.B, mn, metricID) + ii.registerTagIndexes(kb.B, mn, tsid.MetricID) is.db.tb.AddItems(ii.Items) - is.db.s.dateMetricIDCache.Set(date, metricID) + is.db.s.dateMetricIDCache.Set(date, tsid.MetricID) } func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 3a84b32a9..0ce4ddb53 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -612,6 +612,8 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) + date := uint64(timestampFromTime(time.Now())) / msecPerDay + var metricNameBuf []byte var metricNameRawBuf []byte for i := 0; i < 4e2+1; i++ { @@ -637,7 +639,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, // Create tsid for the metricName. var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, date); err != nil { return nil, nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err) } if tsid.AccountID != mn.AccountID { @@ -652,10 +654,9 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, } // fill Date -> MetricID cache - date := uint64(timestampFromTime(time.Now())) / msecPerDay for i := range tsids { tsid := &tsids[i] - is.createPerDayIndexes(date, tsid.MetricID, &mns[i]) + is.createPerDayIndexes(date, tsid, &mns[i]) } // Flush index to disk, so it becomes visible for search @@ -679,6 +680,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten return false } + currentTime := timestampFromTime(time.Now()) allLabelNames := make(map[accountProjectKey]map[string]bool) timeseriesCounters := make(map[accountProjectKey]map[uint64]bool) var tsidCopy TSID @@ -701,7 +703,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten mn.sortTags() metricName := mn.Marshal(nil) - if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName); err != nil { + if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName, uint64(currentTime)/msecPerDay); err != nil { return fmt.Errorf("cannot obtain tsid #%d for mn %s: %w", i, mn, err) } if isConcurrent { @@ -785,7 +787,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten } // Test SearchTenants on specific time range - currentTime := timestampFromTime(time.Now()) tr := TimeRange{ MinTimestamp: currentTime - msecPerDay, MaxTimestamp: currentTime + msecPerDay, @@ -1734,7 +1735,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { for i := range tsids { tsid := &tsids[i] metricIDs.Add(tsid.MetricID) - is.createPerDayIndexes(date, tsid.MetricID, &mns[i]) + is.createPerDayIndexes(date, tsid, &mns[i]) } allMetricIDs.Union(&metricIDs) perDayMetricIDs[date] = &metricIDs diff --git a/lib/storage/storage.go b/lib/storage/storage.go index b720af701..c65b7c4c2 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1983,11 +1983,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { // pMin linearly increases from 0 to 1 during the last hour of the day. pMin := (float64(ts%(3600*24)) / 3600) - 23 type pendingDateMetricID struct { - date uint64 - metricID uint64 - accountID uint32 - projectID uint32 - mr *MetricRow + date uint64 + tsid *TSID + mr *MetricRow } var pendingDateMetricIDs []pendingDateMetricID var pendingNextDayMetricIDs []uint64 @@ -2020,11 +2018,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { p := float64(uint32(fastHashUint64(metricID))) / (1 << 32) if p < pMin && !nextDayMetricIDs.Has(metricID) { pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{ - date: date + 1, - metricID: metricID, - accountID: r.TSID.AccountID, - projectID: r.TSID.ProjectID, - mr: mrs[i], + date: date + 1, + tsid: &r.TSID, + mr: mrs[i], }) pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID) } @@ -2049,11 +2045,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { } // Slow path: store the (date, metricID) entry in the indexDB. pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{ - date: date, - metricID: metricID, - accountID: r.TSID.AccountID, - projectID: r.TSID.ProjectID, - mr: mrs[i], + date: date, + tsid: &r.TSID, + mr: mrs[i], }) } if len(pendingNextDayMetricIDs) > 0 { @@ -2078,16 +2072,16 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { sort.Slice(pendingDateMetricIDs, func(i, j int) bool { a := pendingDateMetricIDs[i] b := pendingDateMetricIDs[j] - if a.accountID != b.accountID { - return a.accountID < b.accountID + if a.tsid.AccountID != b.tsid.AccountID { + return a.tsid.AccountID < b.tsid.AccountID } - if a.projectID != b.projectID { - return a.projectID < b.projectID + if a.tsid.ProjectID != b.tsid.ProjectID { + return a.tsid.ProjectID < b.tsid.ProjectID } if a.date != b.date { return a.date < b.date } - return a.metricID < b.metricID + return a.tsid.MetricID < b.tsid.MetricID }) idb := s.idb() is := idb.getIndexSearch(0, 0, noDeadline) @@ -2097,12 +2091,12 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { mn := GetMetricName() for _, dmid := range pendingDateMetricIDs { date := dmid.date - metricID := dmid.metricID - ok, err := is.hasDateMetricID(date, metricID, dmid.accountID, dmid.projectID) + metricID := dmid.tsid.MetricID + ok, err := is.hasDateMetricID(date, metricID, dmid.tsid.AccountID, dmid.tsid.ProjectID) if err != nil { if firstError == nil { firstError = fmt.Errorf("error when locating (date=%s, metricID=%d, accountID=%d, projectID=%d) in database: %w", - dateToString(date), metricID, dmid.accountID, dmid.projectID, err) + dateToString(date), metricID, dmid.tsid.AccountID, dmid.tsid.ProjectID, err) } continue } @@ -2117,7 +2111,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { continue } mn.sortTags() - is.createPerDayIndexes(date, metricID, mn) + is.createPerDayIndexes(date, dmid.tsid, mn) } dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{ date: date,