lib/storage: create per-day indexes together with global indexes when registering new time series

Previously the creation of per-day indexes and global indexes
for the newly registered time series was decoupled.

Now global indexes and per-day indexes for the current day are created toghether for new time series.
This should speed up registering new time series a bit.
This commit is contained in:
Aliaksandr Valialkin 2022-06-19 21:58:53 +03:00
parent 5fb45173ae
commit 55e7afae3a
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 86 additions and 105 deletions

View file

@ -390,13 +390,13 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
db.s.metricNameCache.Set(key[:], metricName) db.s.metricNameCache.Set(key[:], metricName)
} }
// maybeCreateIndexes probabilistically creates indexes for the given (tsid, metricNameRaw) at db. // maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db.
// //
// The probability increases from 0 to 100% during the first hour since db rotation. // The probability increases from 0 to 100% during the first hour since db rotation.
// //
// It returns true if new index entry was created, and false if it was skipped. // It returns true if new index entry was created, and false if it was skipped.
func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, error) { func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) {
pMin := float64(fasttime.UnixTimestamp()-db.rotationTimestamp) / 3600 pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600
if pMin < 1 { if pMin < 1 {
p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32) p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
if p > pMin { if p > pMin {
@ -410,11 +410,14 @@ func (db *indexDB) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte) (bool, e
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err) return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
} }
mn.sortTags() mn.sortTags()
if err := db.createIndexes(tsid, mn); err != nil { if err := is.createGlobalIndexes(tsid, mn); err != nil {
return false, err return false, fmt.Errorf("cannot create global indexes: %w", err)
}
if err := is.createPerDayIndexes(date, tsid.MetricID, mn); err != nil {
return false, fmt.Errorf("cannot create per-day indexes for date=%d: %w", date, err)
} }
PutMetricName(mn) PutMetricName(mn)
atomic.AddUint64(&db.timeseriesRepopulated, 1) atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
return true, nil return true, nil
} }
@ -515,7 +518,10 @@ type indexSearch struct {
} }
// GetOrCreateTSIDByName fills the dst with TSID for the given metricName. // GetOrCreateTSIDByName fills the dst with TSID for the given metricName.
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error { //
// It also registers the metricName in global and per-day indexes
// for the given date if the metricName->TSID entry is missing in the index.
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date uint64) error {
// A hack: skip searching for the TSID after many serial misses. // A hack: skip searching for the TSID after many serial misses.
// This should improve insertion performance for big batches // This should improve insertion performance for big batches
// of new time series. // of new time series.
@ -540,7 +546,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error
// TSID for the given name wasn't found. Create it. // TSID for the given name wasn't found. Create it.
// It is OK if duplicate TSID for mn is created by concurrent goroutines. // It is OK if duplicate TSID for mn is created by concurrent goroutines.
// Metric results will be merged by mn after TableSearch. // Metric results will be merged by mn after TableSearch.
if err := is.db.createTSIDByName(dst, metricName); err != nil { if err := is.createTSIDByName(dst, metricName, date); err != nil {
return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err) return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err)
} }
return nil return nil
@ -571,22 +577,25 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
db.indexSearchPool.Put(is) db.indexSearchPool.Put(is)
} }
func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint64) error {
mn := GetMetricName() mn := GetMetricName()
defer PutMetricName(mn) defer PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil { if err := mn.Unmarshal(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err) return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
} }
created, err := db.getOrCreateTSID(dst, metricName, mn) created, err := is.db.getOrCreateTSID(dst, metricName, mn)
if err != nil { if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err) return fmt.Errorf("cannot generate TSID: %w", err)
} }
if !db.s.registerSeriesCardinality(dst.MetricID, mn) { if !is.db.s.registerSeriesCardinality(dst.MetricID, mn) {
return errSeriesCardinalityExceeded return errSeriesCardinalityExceeded
} }
if err := db.createIndexes(dst, mn); err != nil { if err := is.createGlobalIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create indexes: %w", err) return fmt.Errorf("cannot create global indexes: %w", err)
}
if err := is.createPerDayIndexes(date, dst.MetricID, mn); err != nil {
return fmt.Errorf("cannot create per-day indexes for date=%d: %w", date, err)
} }
// There is no need in invalidating tag cache, since it is invalidated // There is no need in invalidating tag cache, since it is invalidated
@ -594,7 +603,7 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
if created { if created {
// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB // Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
atomic.AddUint64(&db.newTimeseriesCreated, 1) atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
if logNewSeries { if logNewSeries {
logger.Infof("new series created: %s", mn.String()) logger.Infof("new series created: %s", mn.String())
} }
@ -653,7 +662,7 @@ func generateTSID(dst *TSID, mn *MetricName) {
dst.MetricID = generateUniqueMetricID() dst.MetricID = generateUniqueMetricID()
} }
func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error { func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) error {
// The order of index items is important. // The order of index items is important.
// It guarantees index consistency. // It guarantees index consistency.
@ -684,7 +693,7 @@ func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error {
ii.registerTagIndexes(prefix.B, mn, tsid.MetricID) ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
kbPool.Put(prefix) kbPool.Put(prefix)
return db.tb.AddItems(ii.Items) return is.db.tb.AddItems(ii.Items)
} }
type indexItems struct { type indexItems struct {
@ -2686,11 +2695,11 @@ const (
int64Max = int64((1 << 63) - 1) int64Max = int64((1 << 63) - 1)
) )
func (is *indexSearch) storeDateMetricID(date, metricID uint64, mn *MetricName) error { func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) error {
ii := getIndexItems() ii := getIndexItems()
defer putIndexItems(ii) defer putIndexItems(ii)
ii.B = is.marshalCommonPrefix(ii.B, nsPrefixDateToMetricID) ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID)
ii.B = encoding.MarshalUint64(ii.B, date) ii.B = encoding.MarshalUint64(ii.B, date)
ii.B = encoding.MarshalUint64(ii.B, metricID) ii.B = encoding.MarshalUint64(ii.B, metricID)
ii.Next() ii.Next()
@ -2698,7 +2707,7 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64, mn *MetricName)
// Create per-day inverted index entries for metricID. // Create per-day inverted index entries for metricID.
kb := kbPool.Get() kb := kbPool.Get()
defer kbPool.Put(kb) defer kbPool.Put(kb)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date) kb.B = encoding.MarshalUint64(kb.B, date)
ii.registerTagIndexes(kb.B, mn, metricID) ii.registerTagIndexes(kb.B, mn, metricID)
if err := is.db.tb.AddItems(ii.Items); err != nil { if err := is.db.tb.AddItems(ii.Items); err != nil {
@ -2817,7 +2826,7 @@ func reverseBytes(dst, src []byte) []byte {
func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
kb.B = encoding.MarshalUint64(kb.B, date) kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = encoding.MarshalUint64(kb.B, metricID) kb.B = encoding.MarshalUint64(kb.B, metricID)
if err := ts.FirstItemWithPrefix(kb.B); err != nil { if err := ts.FirstItemWithPrefix(kb.B); err != nil {

View file

@ -604,7 +604,7 @@ func testIndexDBBigMetricName(db *indexDB) error {
mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...) mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...)
mn.sortTags() mn.sortTags()
metricName := mn.Marshal(nil) metricName := mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup") return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup")
} }
@ -617,7 +617,7 @@ func testIndexDBBigMetricName(db *indexDB) error {
}} }}
mn.sortTags() mn.sortTags()
metricName = mn.Marshal(nil) metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key") return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key")
} }
@ -630,7 +630,7 @@ func testIndexDBBigMetricName(db *indexDB) error {
}} }}
mn.sortTags() mn.sortTags()
metricName = mn.Marshal(nil) metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value") return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value")
} }
@ -645,7 +645,7 @@ func testIndexDBBigMetricName(db *indexDB) error {
} }
mn.sortTags() mn.sortTags()
metricName = mn.Marshal(nil) metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err == nil { if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags") return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags")
} }
@ -679,7 +679,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
// Create tsid for the metricName. // Create tsid for the metricName.
var tsid TSID var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil { if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil {
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err) return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
} }
@ -691,8 +691,8 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
date := uint64(timestampFromTime(time.Now())) / msecPerDay date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids { for i := range tsids {
tsid := &tsids[i] tsid := &tsids[i]
if err := is.storeDateMetricID(date, tsid.MetricID, &mns[i]); err != nil { if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil {
return nil, nil, fmt.Errorf("error in storeDateMetricID(%d, %d): %w", date, tsid.MetricID, err) return nil, nil, fmt.Errorf("error in createPerDayIndexes(%d, %d): %w", date, tsid.MetricID, err)
} }
} }
@ -1662,7 +1662,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
metricNameBuf = mn.Marshal(metricNameBuf[:0]) metricNameBuf = mn.Marshal(metricNameBuf[:0])
var tsid TSID var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil { if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil {
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err) t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
} }
mns = append(mns, mn) mns = append(mns, mn)
@ -1675,8 +1675,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
for i := range tsids { for i := range tsids {
tsid := &tsids[i] tsid := &tsids[i]
metricIDs.Add(tsid.MetricID) metricIDs.Add(tsid.MetricID)
if err := is.storeDateMetricID(date, tsid.MetricID, &mns[i]); err != nil { if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil {
t.Fatalf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err) t.Fatalf("error in createPerDayIndexes(%d, %d): %s", date, tsid.MetricID, err)
} }
} }
allMetricIDs.Union(&metricIDs) allMetricIDs.Union(&metricIDs)

View file

@ -93,7 +93,7 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
} }
mn.sortTags() mn.sortTags()
metricName = mn.Marshal(metricName[:0]) metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(tsid, metricName); err != nil { if err := is.GetOrCreateTSIDByName(tsid, metricName, 0); err != nil {
panic(fmt.Errorf("cannot insert record: %w", err)) panic(fmt.Errorf("cannot insert record: %w", err))
} }
} }
@ -122,6 +122,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
var mn MetricName var mn MetricName
var metricName []byte var metricName []byte
var tsid TSID var tsid TSID
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
addSeries := func(kvs ...string) { addSeries := func(kvs ...string) {
mn.Reset() mn.Reset()
for i := 0; i < len(kvs); i += 2 { for i := 0; i < len(kvs); i += 2 {
@ -129,20 +131,20 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
} }
mn.sortTags() mn.sortTags()
metricName = mn.Marshal(metricName[:0]) metricName = mn.Marshal(metricName[:0])
if err := db.createTSIDByName(&tsid, metricName); err != nil { if err := is.createTSIDByName(&tsid, metricName, 0); err != nil {
b.Fatalf("cannot insert record: %s", err) b.Fatalf("cannot insert record: %s", err)
} }
} }
for n := 0; n < 10; n++ { for n := 0; n < 10; n++ {
ns := strconv.Itoa(n) ns := strconv.Itoa(n)
for i := 0; i < 100000; i++ { for i := 0; i < 100000; i++ {
is := strconv.Itoa(i) ix := strconv.Itoa(i)
addSeries("i", is, "n", ns, "j", "foo") addSeries("i", ix, "n", ns, "j", "foo")
// Have some series that won't be matched, to properly test inverted matches. // Have some series that won't be matched, to properly test inverted matches.
addSeries("i", is, "n", ns, "j", "bar") addSeries("i", ix, "n", ns, "j", "bar")
addSeries("i", is, "n", "0_"+ns, "j", "bar") addSeries("i", ix, "n", "0_"+ns, "j", "bar")
addSeries("i", is, "n", "1_"+ns, "j", "bar") addSeries("i", ix, "n", "1_"+ns, "j", "bar")
addSeries("i", is, "n", "2_"+ns, "j", "foo") addSeries("i", ix, "n", "2_"+ns, "j", "foo")
} }
} }
@ -313,7 +315,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
for i := 0; i < recordsCount; i++ { for i := 0; i < recordsCount; i++ {
mn.sortTags() mn.sortTags()
metricName = mn.Marshal(metricName[:0]) metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil { if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err != nil {
b.Fatalf("cannot insert record: %s", err) b.Fatalf("cannot insert record: %s", err)
} }
} }
@ -331,7 +333,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
for i := 0; i < recordsPerLoop; i++ { for i := 0; i < recordsPerLoop; i++ {
mnLocal.sortTags() mnLocal.sortTags()
metricNameLocal = mnLocal.Marshal(metricNameLocal[:0]) metricNameLocal = mnLocal.Marshal(metricNameLocal[:0])
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal); err != nil { if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, 0); err != nil {
panic(fmt.Errorf("cannot obtain tsid: %w", err)) panic(fmt.Errorf("cannot obtain tsid: %w", err))
} }
} }

View file

@ -366,7 +366,7 @@ func (s *Storage) CreateSnapshot() (string, error) {
srcMetadataDir := srcDir + "/metadata" srcMetadataDir := srcDir + "/metadata"
dstMetadataDir := dstDir + "/metadata" dstMetadataDir := dstDir + "/metadata"
if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil { if err := fs.CopyDirectory(srcMetadataDir, dstMetadataDir); err != nil {
return "", fmt.Errorf("cannot copy metadata: %s", err) return "", fmt.Errorf("cannot copy metadata: %w", err)
} }
fs.MustSyncPath(dstDir) fs.MustSyncPath(dstDir)
@ -1666,10 +1666,7 @@ var (
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp. // The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
// Th MetricRow.Value field is ignored. // Th MetricRow.Value field is ignored.
func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
var ( var metricName []byte
metricName []byte
)
var genTSID generationTSID var genTSID generationTSID
mn := GetMetricName() mn := GetMetricName()
defer PutMetricName(mn) defer PutMetricName(mn)
@ -1680,67 +1677,35 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
for i := range mrs { for i := range mrs {
mr := &mrs[i] mr := &mrs[i]
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if genTSID.generation != idb.generation { if genTSID.generation == idb.generation {
// The found entry is from the previous cache generation // Fast path - mr.MetricNameRaw has been already registered in the current idb.
// so attempt to re-populate the current generation with this entry. continue
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw)
if err != nil {
return fmt.Errorf("cannot create indexes in the current indexdb: %w", err)
}
if created {
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
} }
// Fast path - mr.MetricNameRaw has been already registered.
continue
} }
// Slow path - register mr.MetricNameRaw. // Slow path - register mr.MetricNameRaw.
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot register the metric because cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
} }
mn.sortTags() mn.sortTags()
metricName = mn.Marshal(metricName[:0]) metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName); err != nil { date := uint64(mr.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, date); err != nil {
if errors.Is(err, errSeriesCardinalityExceeded) { if errors.Is(err, errSeriesCardinalityExceeded) {
continue continue
} }
return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err) return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err)
} }
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw) s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID)
// Register the metric in per-day inverted index.
date := uint64(mr.Timestamp) / msecPerDay
metricID := genTSID.TSID.MetricID
if s.dateMetricIDCache.Has(date, metricID) {
// Fast path: the metric has been already registered in per-day inverted index
continue
}
// Slow path: acutally register the metric in per-day inverted index.
ok, err := is.hasDateMetricID(date, metricID)
if err != nil {
return fmt.Errorf("cannot register the metric in per-date inverted index because of error when locating (date=%d, metricID=%d) in database: %w",
date, metricID, err)
}
if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there.
if err := is.storeDateMetricID(date, metricID, mn); err != nil {
return fmt.Errorf("cannot register the metric in per-date inverted index because of error when storing (date=%d, metricID=%d) in database: %w",
date, metricID, err)
}
}
// The metric must be added to cache only after it has been successfully added to indexDB.
s.dateMetricIDCache.Set(date, metricID)
} }
return nil return nil
} }
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error { func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
idb := s.idb() idb := s.idb()
j := 0 is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
var ( var (
// These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName. // These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName.
prevTSID TSID prevTSID TSID
@ -1753,6 +1718,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
// Return only the first error, since it has no sense in returning all errors. // Return only the first error, since it has no sense in returning all errors.
var firstWarn error var firstWarn error
j := 0
for i := range mrs { for i := range mrs {
mr := &mrs[i] mr := &mrs[i]
if math.IsNaN(mr.Value) { if math.IsNaN(mr.Value) {
@ -1805,16 +1771,18 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
prevMetricNameRaw = mr.MetricNameRaw prevMetricNameRaw = mr.MetricNameRaw
if genTSID.generation != idb.generation { if genTSID.generation != idb.generation {
// The found entry is from the previous cache generation // The found entry is from the previous cache generation,
// so attempt to re-populate the current generation with this entry. // so attempt to re-populate the current generation with this entry.
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw) date := uint64(r.Timestamp) / msecPerDay
created, err := is.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw, date)
if err != nil { if err != nil {
return fmt.Errorf("cannot create indexes in the current indexdb: %w", err) return fmt.Errorf("cannot create indexes: %w", err)
} }
if created { if created {
genTSID.generation = idb.generation genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw) s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID)
} }
} }
continue continue
@ -1842,7 +1810,6 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
sort.Slice(pendingMetricRows, func(i, j int) bool { sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName) return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
}) })
is := idb.getIndexSearch(noDeadline)
prevMetricNameRaw = nil prevMetricNameRaw = nil
var slowInsertsCount uint64 var slowInsertsCount uint64
for i := range pendingMetricRows { for i := range pendingMetricRows {
@ -1861,7 +1828,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue continue
} }
slowInsertsCount++ slowInsertsCount++
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil { date := uint64(r.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, date); err != nil {
j-- j--
if errors.Is(err, errSeriesCardinalityExceeded) { if errors.Is(err, errSeriesCardinalityExceeded) {
continue continue
@ -1877,11 +1845,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
genTSID.generation = idb.generation genTSID.generation = idb.generation
genTSID.TSID = r.TSID genTSID.TSID = r.TSID
s.putTSIDToCache(&genTSID, mr.MetricNameRaw) s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID)
prevTSID = r.TSID prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw prevMetricNameRaw = mr.MetricNameRaw
} }
idb.putIndexSearch(is)
putPendingMetricRows(pmrs) putPendingMetricRows(pmrs)
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount) atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
} }
@ -1891,15 +1859,17 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
dstMrs = dstMrs[:j] dstMrs = dstMrs[:j]
rows = rows[:j] rows = rows[:j]
var firstError error err := s.updatePerDateData(rows, dstMrs)
if err := s.tb.AddRows(rows); err != nil { if err != nil {
firstError = fmt.Errorf("cannot add rows to table: %w", err) err = fmt.Errorf("cannot update per-date data: %w", err)
} else {
err = s.tb.AddRows(rows)
if err != nil {
err = fmt.Errorf("cannot add rows to table: %w", err)
}
} }
if err := s.updatePerDateData(rows, dstMrs); err != nil && firstError == nil { if err != nil {
firstError = fmt.Errorf("cannot update per-date data: %w", err) return fmt.Errorf("error occurred during rows addition: %w", err)
}
if firstError != nil {
return fmt.Errorf("error occurred during rows addition: %w", firstError)
} }
return nil return nil
} }
@ -2125,7 +2095,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
continue continue
} }
if !ok { if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there. // The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes.
// It is OK if the (date, metricID) entry is added multiple times to db // It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines. // by concurrent goroutines.
if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil { if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil {
@ -2135,9 +2105,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
continue continue
} }
mn.sortTags() mn.sortTags()
if err := is.storeDateMetricID(date, metricID, mn); err != nil { if err := is.createPerDayIndexes(date, metricID, mn); err != nil {
if firstError == nil { if firstError == nil {
firstError = fmt.Errorf("error when storing (date=%d, metricID=%d) in database: %w", date, metricID, err) firstError = fmt.Errorf("error when storing per-date inverted index for (date=%d, metricID=%d): %w", date, metricID, err)
} }
continue continue
} }