diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 3d1c3c57c..ee3600358 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -390,13 +390,13 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) { db.s.metricNameCache.Set(key[:], metricName) } -// maybeCreateIndexes probabilistically creates indexes for the given (tsid, metricNameRaw) at db. +// maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db. // // The probability increases from 0 to 100% during the first hour since db rotation. // // It returns true if new index entry was created, and false if it was skipped. -func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, error) { - pMin := float64(fasttime.UnixTimestamp()-db.rotationTimestamp) / 3600 +func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) { + pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600 if pMin < 1 { p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32) if p > pMin { @@ -410,11 +410,14 @@ func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, e return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err) } mn.sortTags() - if err := db.createIndexes(tsid, mn); err != nil { - return false, err + if err := is.createGlobalIndexes(tsid, mn); err != nil { + return false, fmt.Errorf("cannot create global indexes: %w", err) + } + if err := is.createPerDayIndexes(date, tsid.MetricID, mn); err != nil { + return false, fmt.Errorf("cannot create per-day indexes for date=%d: %w", date, err) } PutMetricName(mn) - atomic.AddUint64(&db.timeseriesRepopulated, 1) + atomic.AddUint64(&is.db.timeseriesRepopulated, 1) return true, nil } @@ -515,7 +518,10 @@ type indexSearch struct { } // GetOrCreateTSIDByName fills the dst with TSID for the given metricName. -func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error { +// +// It also registers the metricName in global and per-day indexes +// for the given date if the metricName->TSID entry is missing in the index. +func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date uint64) error { // A hack: skip searching for the TSID after many serial misses. // This should improve insertion performance for big batches // of new time series. @@ -540,7 +546,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error // TSID for the given name wasn't found. Create it. // It is OK if duplicate TSID for mn is created by concurrent goroutines. // Metric results will be merged by mn after TableSearch. - if err := is.db.createTSIDByName(dst, metricName); err != nil { + if err := is.createTSIDByName(dst, metricName, date); err != nil { return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err) } return nil @@ -571,22 +577,25 @@ func (db *indexDB) putIndexSearch(is *indexSearch) { db.indexSearchPool.Put(is) } -func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { +func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint64) error { mn := GetMetricName() defer PutMetricName(mn) if err := mn.Unmarshal(metricName); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err) } - created, err := db.getOrCreateTSID(dst, metricName, mn) + created, err := is.db.getOrCreateTSID(dst, metricName, mn) if err != nil { return fmt.Errorf("cannot generate TSID: %w", err) } - if !db.s.registerSeriesCardinality(dst.MetricID, mn) { + if !is.db.s.registerSeriesCardinality(dst.MetricID, mn) { return errSeriesCardinalityExceeded } - if err := db.createIndexes(dst, mn); err != nil { - return fmt.Errorf("cannot create indexes: %w", err) + if err := is.createGlobalIndexes(dst, mn); err != nil { + return fmt.Errorf("cannot create global indexes: %w", err) + } + if err := is.createPerDayIndexes(date, dst.MetricID, mn); err != nil { + return fmt.Errorf("cannot create per-day indexes for date=%d: %w", date, err) } // There is no need in invalidating tag cache, since it is invalidated @@ -594,7 +603,7 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { if created { // Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB - atomic.AddUint64(&db.newTimeseriesCreated, 1) + atomic.AddUint64(&is.db.newTimeseriesCreated, 1) if logNewSeries { logger.Infof("new series created: %s", mn.String()) } @@ -653,7 +662,7 @@ func generateTSID(dst *TSID, mn *MetricName) { dst.MetricID = generateUniqueMetricID() } -func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error { +func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) error { // The order of index items is important. // It guarantees index consistency. @@ -684,7 +693,7 @@ func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error { ii.registerTagIndexes(prefix.B, mn, tsid.MetricID) kbPool.Put(prefix) - return db.tb.AddItems(ii.Items) + return is.db.tb.AddItems(ii.Items) } type indexItems struct { @@ -2686,11 +2695,11 @@ const ( int64Max = int64((1 << 63) - 1) ) -func (is *indexSearch) storeDateMetricID(date, metricID uint64, mn *MetricName) error { +func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) error { ii := getIndexItems() defer putIndexItems(ii) - ii.B = is.marshalCommonPrefix(ii.B, nsPrefixDateToMetricID) + ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID) ii.B = encoding.MarshalUint64(ii.B, date) ii.B = encoding.MarshalUint64(ii.B, metricID) ii.Next() @@ -2698,7 +2707,7 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64, mn *MetricName) // Create per-day inverted index entries for metricID. kb := kbPool.Get() defer kbPool.Put(kb) - kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = encoding.MarshalUint64(kb.B, date) ii.registerTagIndexes(kb.B, mn, metricID) if err := is.db.tb.AddItems(ii.Items); err != nil { @@ -2817,7 +2826,7 @@ func reverseBytes(dst, src []byte) []byte { func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { ts := &is.ts kb := &is.kb - kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) kb.B = encoding.MarshalUint64(kb.B, date) kb.B = encoding.MarshalUint64(kb.B, metricID) if err := ts.FirstItemWithPrefix(kb.B); err != nil { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 7a852a3c6..63dba094a 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -604,7 +604,7 @@ func testIndexDBBigMetricName(db *indexDB) error { mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...) mn.sortTags() metricName := mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup") } @@ -617,7 +617,7 @@ func testIndexDBBigMetricName(db *indexDB) error { }} mn.sortTags() metricName = mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key") } @@ -630,7 +630,7 @@ func testIndexDBBigMetricName(db *indexDB) error { }} mn.sortTags() metricName = mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value") } @@ -645,7 +645,7 @@ func testIndexDBBigMetricName(db *indexDB) error { } mn.sortTags() metricName = mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags") } @@ -679,7 +679,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa // Create tsid for the metricName. var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil { return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err) } @@ -691,8 +691,8 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa date := uint64(timestampFromTime(time.Now())) / msecPerDay for i := range tsids { tsid := &tsids[i] - if err := is.storeDateMetricID(date, tsid.MetricID, &mns[i]); err != nil { - return nil, nil, fmt.Errorf("error in storeDateMetricID(%d, %d): %w", date, tsid.MetricID, err) + if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil { + return nil, nil, fmt.Errorf("error in createPerDayIndexes(%d, %d): %w", date, tsid.MetricID, err) } } @@ -1662,7 +1662,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { metricNameBuf = mn.Marshal(metricNameBuf[:0]) var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil { t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err) } mns = append(mns, mn) @@ -1675,8 +1675,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { for i := range tsids { tsid := &tsids[i] metricIDs.Add(tsid.MetricID) - if err := is.storeDateMetricID(date, tsid.MetricID, &mns[i]); err != nil { - t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err) + if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil { + t.Fatalf("error in createPerDayIndexes(%d, %d): %s", date, tsid.MetricID, err) } } allMetricIDs.Union(&metricIDs) diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 3f71a4171..3498052a9 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -93,7 +93,7 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs } mn.sortTags() metricName = mn.Marshal(metricName[:0]) - if err := is.GetOrCreateTSIDByName(tsid, metricName); err != nil { + if err := is.GetOrCreateTSIDByName(tsid, metricName, 0); err != nil { panic(fmt.Errorf("cannot insert record: %w", err)) } } @@ -122,6 +122,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { var mn MetricName var metricName []byte var tsid TSID + is := db.getIndexSearch(noDeadline) + defer db.putIndexSearch(is) addSeries := func(kvs ...string) { mn.Reset() for i := 0; i < len(kvs); i += 2 { @@ -129,20 +131,20 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { } mn.sortTags() metricName = mn.Marshal(metricName[:0]) - if err := db.createTSIDByName(&tsid, metricName); err != nil { + if err := is.createTSIDByName(&tsid, metricName, 0); err != nil { b.Fatalf("cannot insert record: %s", err) } } for n := 0; n < 10; n++ { ns := strconv.Itoa(n) for i := 0; i < 100000; i++ { - is := strconv.Itoa(i) - addSeries("i", is, "n", ns, "j", "foo") + ix := strconv.Itoa(i) + addSeries("i", ix, "n", ns, "j", "foo") // Have some series that won't be matched, to properly test inverted matches. - addSeries("i", is, "n", ns, "j", "bar") - addSeries("i", is, "n", "0_"+ns, "j", "bar") - addSeries("i", is, "n", "1_"+ns, "j", "bar") - addSeries("i", is, "n", "2_"+ns, "j", "foo") + addSeries("i", ix, "n", ns, "j", "bar") + addSeries("i", ix, "n", "0_"+ns, "j", "bar") + addSeries("i", ix, "n", "1_"+ns, "j", "bar") + addSeries("i", ix, "n", "2_"+ns, "j", "foo") } } @@ -313,7 +315,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { for i := 0; i < recordsCount; i++ { mn.sortTags() metricName = mn.Marshal(metricName[:0]) - if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err != nil { b.Fatalf("cannot insert record: %s", err) } } @@ -331,7 +333,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { for i := 0; i < recordsPerLoop; i++ { mnLocal.sortTags() metricNameLocal = mnLocal.Marshal(metricNameLocal[:0]) - if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal); err != nil { + if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, 0); err != nil { panic(fmt.Errorf("cannot obtain tsid: %w", err)) } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 496c52754..37664dbab 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -366,7 +366,7 @@ func (s *Storage) CreateSnapshot() (string, error) { srcMetadataDir := srcDir + "/metadata" dstMetadataDir := dstDir + "/metadata" if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil { - return "", fmt.Errorf("cannot copy metadata: %s", err) + return "", fmt.Errorf("cannot copy metadata: %w", err) } fs.MustSyncPath(dstDir) @@ -1666,10 +1666,7 @@ var ( // The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp. // Th MetricRow.Value field is ignored. func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { - var ( - metricName []byte - ) - + var metricName []byte var genTSID generationTSID mn := GetMetricName() defer PutMetricName(mn) @@ -1680,67 +1677,35 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { for i := range mrs { mr := &mrs[i] if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { - if genTSID.generation != idb.generation { - // The found entry is from the previous cache generation - // so attempt to re-populate the current generation with this entry. - // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 - created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw) - if err != nil { - return fmt.Errorf("cannot create indexes in the current indexdb: %w", err) - } - if created { - genTSID.generation = idb.generation - s.putTSIDToCache(&genTSID, mr.MetricNameRaw) - } + if genTSID.generation == idb.generation { + // Fast path - mr.MetricNameRaw has been already registered in the current idb. + continue } - // Fast path - mr.MetricNameRaw has been already registered. - continue } - // Slow path - register mr.MetricNameRaw. if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { - return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) } mn.sortTags() metricName = mn.Marshal(metricName[:0]) - if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName); err != nil { + date := uint64(mr.Timestamp) / msecPerDay + if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, date); err != nil { if errors.Is(err, errSeriesCardinalityExceeded) { continue } - return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err) + return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err) } + genTSID.generation = idb.generation s.putTSIDToCache(&genTSID, mr.MetricNameRaw) - - // Register the metric in per-day inverted index. - date := uint64(mr.Timestamp) / msecPerDay - metricID := genTSID.TSID.MetricID - if s.dateMetricIDCache.Has(date, metricID) { - // Fast path: the metric has been already registered in per-day inverted index - continue - } - - // Slow path: acutally register the metric in per-day inverted index. - ok, err := is.hasDateMetricID(date, metricID) - if err != nil { - return fmt.Errorf("cannot register the metric in per-date inverted index because of error when locating (date=%d, metricID=%d) in database: %w", - date, metricID, err) - } - if !ok { - // The (date, metricID) entry is missing in the indexDB. Add it there. - if err := is.storeDateMetricID(date, metricID, mn); err != nil { - return fmt.Errorf("cannot register the metric in per-date inverted index because of error when storing (date=%d, metricID=%d) in database: %w", - date, metricID, err) - } - } - // The metric must be added to cache only after it has been successfully added to indexDB. - s.dateMetricIDCache.Set(date, metricID) + s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID) } return nil } func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error { idb := s.idb() - j := 0 + is := idb.getIndexSearch(noDeadline) + defer idb.putIndexSearch(is) var ( // These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName. prevTSID TSID @@ -1753,6 +1718,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci // Return only the first error, since it has no sense in returning all errors. var firstWarn error + j := 0 for i := range mrs { mr := &mrs[i] if math.IsNaN(mr.Value) { @@ -1805,16 +1771,18 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci prevMetricNameRaw = mr.MetricNameRaw if genTSID.generation != idb.generation { - // The found entry is from the previous cache generation + // The found entry is from the previous cache generation, // so attempt to re-populate the current generation with this entry. // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 - created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw) + date := uint64(r.Timestamp) / msecPerDay + created, err := is.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw, date) if err != nil { - return fmt.Errorf("cannot create indexes in the current indexdb: %w", err) + return fmt.Errorf("cannot create indexes: %w", err) } if created { genTSID.generation = idb.generation s.putTSIDToCache(&genTSID, mr.MetricNameRaw) + s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID) } } continue @@ -1842,7 +1810,6 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci sort.Slice(pendingMetricRows, func(i, j int) bool { return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName) }) - is := idb.getIndexSearch(noDeadline) prevMetricNameRaw = nil var slowInsertsCount uint64 for i := range pendingMetricRows { @@ -1861,7 +1828,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci continue } slowInsertsCount++ - if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil { + date := uint64(r.Timestamp) / msecPerDay + if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, date); err != nil { j-- if errors.Is(err, errSeriesCardinalityExceeded) { continue @@ -1877,11 +1845,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci genTSID.generation = idb.generation genTSID.TSID = r.TSID s.putTSIDToCache(&genTSID, mr.MetricNameRaw) + s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID) prevTSID = r.TSID prevMetricNameRaw = mr.MetricNameRaw } - idb.putIndexSearch(is) putPendingMetricRows(pmrs) atomic.AddUint64(&s.slowRowInserts, slowInsertsCount) } @@ -1891,15 +1859,17 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci dstMrs = dstMrs[:j] rows = rows[:j] - var firstError error - if err := s.tb.AddRows(rows); err != nil { - firstError = fmt.Errorf("cannot add rows to table: %w", err) + err := s.updatePerDateData(rows, dstMrs) + if err != nil { + err = fmt.Errorf("cannot update per-date data: %w", err) + } else { + err = s.tb.AddRows(rows) + if err != nil { + err = fmt.Errorf("cannot add rows to table: %w", err) + } } - if err := s.updatePerDateData(rows, dstMrs); err != nil && firstError == nil { - firstError = fmt.Errorf("cannot update per-date data: %w", err) - } - if firstError != nil { - return fmt.Errorf("error occurred during rows addition: %w", firstError) + if err != nil { + return fmt.Errorf("error occurred during rows addition: %w", err) } return nil } @@ -2125,7 +2095,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { continue } if !ok { - // The (date, metricID) entry is missing in the indexDB. Add it there. + // The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil { @@ -2135,9 +2105,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { continue } mn.sortTags() - if err := is.storeDateMetricID(date, metricID, mn); err != nil { + if err := is.createPerDayIndexes(date, metricID, mn); err != nil { if firstError == nil { - firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err) + firstError = fmt.Errorf("error when storing per-date inverted index for (date=%d, metricID=%d): %w", date, metricID, err) } continue }