diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 2ea0d1391..f26f5464f 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -467,12 +467,6 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(idbm().PartsRefCount) }) - metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 { - return float64(idbm().NewTimeseriesCreated) - }) - metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 { - return float64(idbm().TimeseriesRepopulated) - }) metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 { return float64(idbm().MissingTSIDsForMetricID) }) @@ -587,6 +581,12 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().TooSmallTimestampRows) }) + metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 { + return float64(m().TimeseriesRepopulated) + }) + metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 { + return float64(m().NewTimeseriesCreated) + }) metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) }) diff --git a/app/vmstorage/servers/vmselect.go b/app/vmstorage/servers/vmselect.go index 97a8f5d12..27a868ce1 100644 --- a/app/vmstorage/servers/vmselect.go +++ b/app/vmstorage/servers/vmselect.go @@ -156,7 +156,8 @@ func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.Search } func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error { - return api.s.RegisterMetricNames(qt, mrs) + api.s.RegisterMetricNames(qt, mrs) + return nil } func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9876c9381..f9fd3e0f7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -27,7 +27,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html). * SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved). - +* FETURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). The change significantly reduces the size of `indexdb/dataBlocks` cache for such setups. The cache size can be [monitored](https://docs.victoriametrics.com/#monitoring) via `vm_cache_size_bytes{type="indexdb/dataBlocks"}` metric. * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): update backoff policy on retries to reduce probability of overloading for `source` or `destination` databases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4402). diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 3cd1f7001..93f6a0c3d 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -31,7 +31,17 @@ import ( const ( // Prefix for MetricName->TSID entries. - nsPrefixMetricNameToTSID = 0 + // + // This index was substituted with nsPrefixDateMetricNameToTSID, + // since the MetricName->TSID index may require big amounts of memory for indexdb/dataBlocks cache + // when it grows big on the configured retention under high churn rate + // (e.g. when new time series are constantly registered). + // + // It is much more efficient from memory usage PoV to query per-day MetricName->TSID index + // (aka nsPrefixDateMetricNameToTSID) when the TSID must be obtained for the given MetricName + // during data ingestion under high churn rate and big retention. + // + // nsPrefixMetricNameToTSID = 0 // Prefix for Tag->MetricID entries. nsPrefixTagToMetricIDs = 1 @@ -50,6 +60,9 @@ const ( // Prefix for (Date,Tag)->MetricID entries. nsPrefixDateTagToMetricIDs = 6 + + // Prefix for (Date,MetricName)->TSID entries. + nsPrefixDateMetricNameToTSID = 7 ) // indexDB represents an index db. @@ -59,12 +72,6 @@ type indexDB struct { refCount uint64 - // The counter for newly created time series. It can be used for determining time series churn rate. - newTimeseriesCreated uint64 - - // The counter for time series which were re-populated from previous indexDB after the rotation. - timeseriesRepopulated uint64 - // The number of missing MetricID -> TSID entries. // High rate for this value means corrupted indexDB. missingTSIDsForMetricID uint64 @@ -177,8 +184,6 @@ type IndexDBMetrics struct { IndexDBRefCount uint64 - NewTimeseriesCreated uint64 - TimeseriesRepopulated uint64 MissingTSIDsForMetricID uint64 RecentHourMetricIDsSearchCalls uint64 @@ -219,8 +224,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) - m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated) - m.TimeseriesRepopulated += atomic.LoadUint64(&db.timeseriesRepopulated) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) @@ -385,33 +388,6 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) { db.s.metricNameCache.Set(key[:], metricName) } -// maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db. -// -// The probability increases from 0 to 100% during the first hour since db rotation. -// -// It returns true if new index entry was created, and false if it was skipped. -func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) { - pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600 - if pMin < 1 { - p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32) - if p > pMin { - // Fast path: there is no need creating indexes for metricNameRaw yet. - return false, nil - } - } - // Slow path: create indexes for (tsid, metricNameRaw) at db. - mn := GetMetricName() - if err := mn.UnmarshalRaw(metricNameRaw); err != nil { - return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err) - } - mn.sortTags() - is.createGlobalIndexes(tsid, mn) - is.createPerDayIndexes(date, tsid.MetricID, mn) - PutMetricName(mn) - atomic.AddUint64(&is.db.timeseriesRepopulated, 1) - return true, nil -} - func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte { prefix := ^uint64(0) if versioned { @@ -486,25 +462,28 @@ func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) { return dst, nil } -// getTSIDByNameNoCreate fills the dst with TSID for the given metricName. +// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date. // -// It returns io.EOF if the given mn isn't found locally. -func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error { - is := db.getIndexSearch(0, 0, noDeadline) - err := is.getTSIDByMetricName(dst, metricName) - db.putIndexSearch(is) - if err == nil { - return nil - } - if err != io.EOF { - return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err) +// It returns false if the given metricName isn't found in the indexdb. +func (is *indexSearch) getTSIDByMetricName(dst *generationTSID, metricName []byte, date uint64) bool { + if is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date) { + // Fast path - the TSID is found in the current indexdb. + dst.generation = is.db.generation + return true } - // Do not search for the TSID in the external storage, - // since this function is already called by another indexDB instance. - - // The TSID for the given mn wasn't found. - return io.EOF + // Slow path - search for the TSID in the previous indexdb + ok := false + deadline := is.deadline + is.db.doExtDB(func(extDB *indexDB) { + is := extDB.getIndexSearch(0, 0, deadline) + ok = is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date) + extDB.putIndexSearch(is) + if ok { + dst.generation = extDB.generation + } + }) + return ok } type indexSearch struct { @@ -518,55 +497,6 @@ type indexSearch struct { // deadline in unix timestamp seconds for the given search. deadline uint64 - - // tsidByNameMisses and tsidByNameSkips is used for a performance - // hack in GetOrCreateTSIDByName. See the comment there. - tsidByNameMisses int - tsidByNameSkips int -} - -// GetOrCreateTSIDByName fills the dst with TSID for the given metricName. -// -// It also registers the metricName in global and per-day indexes -// for the given date if the metricName->TSID entry is missing in the index. -func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error { - // A hack: skip searching for the TSID after many serial misses. - // This should improve insertion performance for big batches - // of new time series. - if is.tsidByNameMisses < 100 { - err := is.getTSIDByMetricName(dst, metricName) - if err == nil { - // Fast path - the TSID for the given metricName has been found in the index. - is.tsidByNameMisses = 0 - if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil { - return err - } - // There is no need in checking whether the TSID is present in the per-day index for the given date, - // since this check must be performed by the caller in an optimized way. - // See storage.updatePerDateData() function. - return nil - } - if err != io.EOF { - userReadableMetricName := getUserReadableMetricName(metricNameRaw) - return fmt.Errorf("cannot search TSID by MetricName %s: %w", userReadableMetricName, err) - } - is.tsidByNameMisses++ - } else { - is.tsidByNameSkips++ - if is.tsidByNameSkips > 10000 { - is.tsidByNameSkips = 0 - is.tsidByNameMisses = 0 - } - } - - // TSID for the given name wasn't found. Create it. - // It is OK if duplicate TSID for mn is created by concurrent goroutines. - // Metric results will be merged by mn after TableSearch. - if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil { - userReadableMetricName := getUserReadableMetricName(metricNameRaw) - return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err) - } - return nil } func (db *indexDB) getIndexSearch(accountID, projectID uint32, deadline uint64) *indexSearch { @@ -592,75 +522,9 @@ func (db *indexDB) putIndexSearch(is *indexSearch) { is.projectID = 0 is.deadline = 0 - // Do not reset tsidByNameMisses and tsidByNameSkips, - // since they are used in GetOrCreateTSIDByName across call boundaries. - db.indexSearchPool.Put(is) } -func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error { - mn := GetMetricName() - defer PutMetricName(mn) - if err := mn.Unmarshal(metricName); err != nil { - return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err) - } - - created, err := is.db.getOrCreateTSID(dst, metricName, mn) - if err != nil { - return fmt.Errorf("cannot generate TSID: %w", err) - } - if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil { - return err - } - is.createGlobalIndexes(dst, mn) - is.createPerDayIndexes(date, dst.MetricID, mn) - - // There is no need in invalidating tag cache, since it is invalidated - // on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable. - - if created { - // Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB - atomic.AddUint64(&is.db.newTimeseriesCreated, 1) - if logNewSeries { - logger.Infof("new series created: %s", mn.String()) - } - } - return nil -} - -// SetLogNewSeries updates new series logging. -// -// This function must be called before any calling any storage functions. -func SetLogNewSeries(ok bool) { - logNewSeries = ok -} - -var logNewSeries = false - -// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found. -// -// Returns true if TSID was created or false if TSID was in extDB -func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) { - // Search the TSID in the external storage. - // This is usually the db from the previous period. - var err error - if db.doExtDB(func(extDB *indexDB) { - err = extDB.getTSIDByNameNoCreate(dst, metricName) - }) { - if err == nil { - // The TSID has been found in the external storage. - return false, nil - } - if err != io.EOF { - return false, fmt.Errorf("external search failed: %w", err) - } - } - // The TSID wasn't found in the external storage. - // Generate it locally. - generateTSID(dst, mn) - return true, nil -} - func generateTSID(dst *TSID, mn *MetricName) { dst.AccountID = mn.AccountID dst.ProjectID = mn.ProjectID @@ -686,13 +550,6 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) { ii := getIndexItems() defer putIndexItems(ii) - // Create MetricName -> TSID index. - ii.B = append(ii.B, nsPrefixMetricNameToTSID) - ii.B = mn.Marshal(ii.B) - ii.B = append(ii.B, kvSeparatorChar) - ii.B = tsid.Marshal(ii.B) - ii.Next() - // Create MetricID -> MetricName index. ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID) ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID) @@ -2089,26 +1946,28 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, accountID, proj var tagFiltersKeyBufPool bytesutil.ByteBufferPool -func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error { +func (is *indexSearch) getTSIDByMetricNameNoExtDB(dst *TSID, metricName []byte, date uint64) bool { dmis := is.db.s.getDeletedMetricIDs() ts := &is.ts kb := &is.kb - kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID) + // Do not use marshalCommonPrefix() here, since mn already contains (AccountID, ProjectID) + kb.B = append(kb.B, nsPrefixDateMetricNameToTSID) + kb.B = encoding.MarshalUint64(kb.B, date) kb.B = append(kb.B, metricName...) kb.B = append(kb.B, kvSeparatorChar) ts.Seek(kb.B) for ts.NextItem() { if !bytes.HasPrefix(ts.Item, kb.B) { // Nothing found. - return io.EOF + return false } v := ts.Item[len(kb.B):] tail, err := dst.Unmarshal(v) if err != nil { - return fmt.Errorf("cannot unmarshal TSID: %w", err) + logger.Panicf("FATAL: cannot unmarshal TSID: %s", err) } if len(tail) > 0 { - return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail) + logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling TSID: %X", tail) } if dmis.Len() > 0 { // Verify whether the dst is marked as deleted. @@ -2118,13 +1977,13 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error { } } // Found valid dst. - return nil + return true } if err := ts.Error(); err != nil { - return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err) + logger.Panicf("FATAL: error when searching TSID by metricName; searchPrefix %q: %s", kb.B, err) } // Nothing found - return io.EOF + return false } func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) { @@ -2975,13 +2834,23 @@ const ( int64Max = int64((1 << 63) - 1) ) -func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) { +func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricName) { ii := getIndexItems() defer putIndexItems(ii) ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID, mn.AccountID, mn.ProjectID) ii.B = encoding.MarshalUint64(ii.B, date) - ii.B = encoding.MarshalUint64(ii.B, metricID) + ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID) + ii.Next() + + // Create per-day inverted index entries for TSID. + // + // Do not use marshalCommonPrefix() here, since mn already contains (AccountID, ProjectID) + ii.B = append(ii.B, nsPrefixDateMetricNameToTSID) + ii.B = encoding.MarshalUint64(ii.B, date) + ii.B = mn.Marshal(ii.B) + ii.B = append(ii.B, kvSeparatorChar) + ii.B = tsid.Marshal(ii.B) ii.Next() // Create per-day inverted index entries for metricID. @@ -2989,9 +2858,8 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName defer kbPool.Put(kb) kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, mn.AccountID, mn.ProjectID) kb.B = encoding.MarshalUint64(kb.B, date) - ii.registerTagIndexes(kb.B, mn, metricID) + ii.registerTagIndexes(kb.B, mn, tsid.MetricID) is.db.tb.AddItems(ii.Items) - is.db.s.dateMetricIDCache.Set(date, metricID) } func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) { @@ -3101,22 +2969,24 @@ func reverseBytes(dst, src []byte) []byte { return dst } -func (is *indexSearch) hasDateMetricID(date, metricID uint64, accountID, projectID uint32) (bool, error) { +func (is *indexSearch) hasDateMetricIDNoExtDB(date, metricID uint64, accountID, projectID uint32) bool { ts := &is.ts kb := &is.kb kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID, accountID, projectID) kb.B = encoding.MarshalUint64(kb.B, date) kb.B = encoding.MarshalUint64(kb.B, metricID) - if err := ts.FirstItemWithPrefix(kb.B); err != nil { - if err == io.EOF { - return false, nil + err := ts.FirstItemWithPrefix(kb.B) + if err == nil { + if string(ts.Item) != string(kb.B) { + logger.Panicf("FATAL: unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B) } - return false, fmt.Errorf("error when searching for (date=%s, metricID=%d) entry: %w", dateToString(date), metricID, err) + // Fast path - the (date, metricID) entry is found in the current indexdb. + return true } - if string(ts.Item) != string(kb.B) { - return false, fmt.Errorf("unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B) + if err != io.EOF { + logger.Panicf("FATAL: unexpected error when searching for (date=%s, metricID=%d) entry: %s", dateToString(date), metricID, err) } - return true, nil + return false } func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte, diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 3a84b32a9..0c3ab2bba 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -507,12 +507,11 @@ func TestRemoveDuplicateMetricIDs(t *testing.T) { } func TestIndexDBOpenClose(t *testing.T) { - s := newTestStorage() - defer stopTestStorage(s) + var s Storage tableName := nextIndexDBTableName() for i := 0; i < 5; i++ { var isReadOnly uint32 - db := mustOpenIndexDB(tableName, s, 0, &isReadOnly) + db := mustOpenIndexDB(tableName, &s, 0, &isReadOnly) db.MustClose() } if err := os.RemoveAll(tableName); err != nil { @@ -526,19 +525,10 @@ func TestIndexDB(t *testing.T) { const metricGroups = 10 t.Run("serial", func(t *testing.T) { - s := newTestStorage() - defer stopTestStorage(s) - - dbName := nextIndexDBTableName() - var isReadOnly uint32 - db := mustOpenIndexDB(dbName, s, 0, &isReadOnly) - defer func() { - db.MustClose() - if err := os.RemoveAll(dbName); err != nil { - t.Fatalf("cannot remove indexDB: %s", err) - } - }() + const path = "TestIndexDB-serial" + s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + db := s.idb() mns, tsids, tenants, err := testIndexDBGetOrCreateTSIDByName(db, accountsCount, projectsCount, metricGroups) if err != nil { t.Fatalf("unexpected error: %s", err) @@ -547,27 +537,23 @@ func TestIndexDB(t *testing.T) { t.Fatalf("unexpected error: %s", err) } - // Re-open the db and verify it works as expected. - db.MustClose() - db = mustOpenIndexDB(dbName, s, 0, &isReadOnly) + // Re-open the storage and verify it works as expected. + s.MustClose() + s = MustOpenStorage(path, maxRetentionMsecs, 0, 0) + + db = s.idb() if err := testIndexDBCheckTSIDByName(db, mns, tsids, tenants, false); err != nil { t.Fatalf("unexpected error: %s", err) } + + s.MustClose() + fs.MustRemoveAll(path) }) t.Run("concurrent", func(t *testing.T) { - s := newTestStorage() - defer stopTestStorage(s) - - dbName := nextIndexDBTableName() - var isReadOnly uint32 - db := mustOpenIndexDB(dbName, s, 0, &isReadOnly) - defer func() { - db.MustClose() - if err := os.RemoveAll(dbName); err != nil { - t.Fatalf("cannot remove indexDB: %s", err) - } - }() + const path = "TestIndexDB-concurrent" + s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + db := s.idb() ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { @@ -584,20 +570,20 @@ func TestIndexDB(t *testing.T) { ch <- nil }() } - var errors []error + deadlineCh := time.After(30 * time.Second) for i := 0; i < cap(ch); i++ { select { case err := <-ch: if err != nil { - errors = append(errors, fmt.Errorf("unexpected error: %w", err)) + t.Fatalf("unexpected error: %s", err) } - case <-time.After(30 * time.Second): + case <-deadlineCh: t.Fatalf("timeout") } } - if len(errors) > 0 { - t.Fatal(errors[0]) - } + + s.MustClose() + fs.MustRemoveAll(path) }) } @@ -610,11 +596,12 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, tenants := make(map[string]struct{}) is := db.getIndexSearch(0, 0, noDeadline) - defer db.putIndexSearch(is) + + date := uint64(timestampFromTime(time.Now())) / msecPerDay var metricNameBuf []byte var metricNameRawBuf []byte - for i := 0; i < 4e2+1; i++ { + for i := 0; i < 401; i++ { var mn MetricName mn.AccountID = uint32((i + 2) % accountsCount) mn.ProjectID = uint32((i + 1) % projectsCount) @@ -636,30 +623,26 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0]) // Create tsid for the metricName. - var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil { - return nil, nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err) + var genTSID generationTSID + if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { + generateTSID(&genTSID.TSID, &mn) + genTSID.generation = db.generation + db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date) } - if tsid.AccountID != mn.AccountID { - return nil, nil, nil, fmt.Errorf("unexpected TSID.AccountID; got %d; want %d; mn:\n%s\ntsid:\n%+v", tsid.AccountID, mn.AccountID, &mn, &tsid) + if genTSID.TSID.AccountID != mn.AccountID { + return nil, nil, nil, fmt.Errorf("unexpected TSID.AccountID; got %d; want %d; mn:\n%s\ntsid:\n%+v", genTSID.TSID.AccountID, mn.AccountID, &mn, &genTSID.TSID) } - if tsid.ProjectID != mn.ProjectID { - return nil, nil, nil, fmt.Errorf("unexpected TSID.ProjectID; got %d; want %d; mn:\n%s\ntsid:\n%+v", tsid.ProjectID, mn.ProjectID, &mn, &tsid) + if genTSID.TSID.ProjectID != mn.ProjectID { + return nil, nil, nil, fmt.Errorf("unexpected TSID.ProjectID; got %d; want %d; mn:\n%s\ntsid:\n%+v", genTSID.TSID.ProjectID, mn.ProjectID, &mn, &genTSID.TSID) } mns = append(mns, mn) - tsids = append(tsids, tsid) - } - - // fill Date -> MetricID cache - date := uint64(timestampFromTime(time.Now())) / msecPerDay - for i := range tsids { - tsid := &tsids[i] - is.createPerDayIndexes(date, tsid.MetricID, &mns[i]) + tsids = append(tsids, genTSID.TSID) } + db.putIndexSearch(is) // Flush index to disk, so it becomes visible for search - db.tb.DebugFlush() + db.s.DebugFlush() var tenantsList []string for tenant := range tenants { @@ -679,9 +662,10 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten return false } + currentTime := timestampFromTime(time.Now()) allLabelNames := make(map[accountProjectKey]map[string]bool) timeseriesCounters := make(map[accountProjectKey]map[uint64]bool) - var tsidCopy TSID + var genTSID generationTSID var metricNameCopy []byte for i := range mns { mn := &mns[i] @@ -701,26 +685,29 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten mn.sortTags() metricName := mn.Marshal(nil) - if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName); err != nil { - return fmt.Errorf("cannot obtain tsid #%d for mn %s: %w", i, mn, err) + is := db.getIndexSearch(0, 0, noDeadline) + if !is.getTSIDByMetricName(&genTSID, metricName, uint64(currentTime)/msecPerDay) { + return fmt.Errorf("cannot obtain tsid #%d for mn %s", i, mn) } + db.putIndexSearch(is) + if isConcurrent { // Copy tsid.MetricID, since multiple TSIDs may match // the same mn in concurrent mode. - tsidCopy.MetricID = tsid.MetricID + genTSID.TSID.MetricID = tsid.MetricID } - if !reflect.DeepEqual(tsid, &tsidCopy) { - return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &tsidCopy, tsid) + if !reflect.DeepEqual(tsid, &genTSID.TSID) { + return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &genTSID.TSID, tsid) } // Search for metric name for the given metricID. var err error - metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], tsidCopy.MetricID, tsidCopy.AccountID, tsidCopy.ProjectID) + metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], genTSID.TSID.MetricID, genTSID.TSID.AccountID, genTSID.TSID.ProjectID) if err != nil { - return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err) + return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", genTSID.TSID.MetricID, i, err) } if !bytes.Equal(metricName, metricNameCopy) { - return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName) + return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", genTSID.TSID.MetricID, metricNameCopy, metricName) } // Try searching metric name for non-existent MetricID. @@ -785,7 +772,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten } // Test SearchTenants on specific time range - currentTime := timestampFromTime(time.Now()) tr := TimeRange{ MinTimestamp: currentTime - msecPerDay, MaxTimestamp: currentTime + msecPerDay, @@ -815,10 +801,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten } // Try tag filters. - tr = TimeRange{ - MinTimestamp: currentTime - msecPerDay, - MaxTimestamp: currentTime + msecPerDay, - } for i := range mns { mn := &mns[i] tsid := &tsids[i] @@ -1557,12 +1539,6 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { r := rand.New(rand.NewSource(1)) path := "TestIndexRepopulateAfterRotation" s := MustOpenStorage(path, msecsPerMonth, 1e5, 1e5) - defer func() { - s.MustClose() - if err := os.RemoveAll(path); err != nil { - t.Fatalf("cannot remove %q: %s", path, err) - } - }() db := s.idb() if db.generation == 0 { @@ -1570,9 +1546,11 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { } const metricRowsN = 1000 - // use min-max timestamps of 1month range to create smaller number of partitions - timeMin, timeMax := time.Now().Add(-730*time.Hour), time.Now() - mrs := testGenerateMetricRows(r, metricRowsN, timeMin.UnixMilli(), timeMax.UnixMilli()) + + currentDayTimestamp := (time.Now().UnixMilli() / msecPerDay) * msecPerDay + timeMin := currentDayTimestamp - 24*3600*1000 + timeMax := currentDayTimestamp + 24*3600*1000 + mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax) if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { t.Fatalf("unexpected error when adding mrs: %s", err) } @@ -1586,7 +1564,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { } // check new series were registered in indexDB - added := atomic.LoadUint64(&db.newTimeseriesCreated) + added := atomic.LoadUint64(&db.s.newTimeseriesCreated) if added != metricRowsN { t.Fatalf("expected indexDB to contain %d rows; got %d", metricRowsN, added) } @@ -1626,50 +1604,34 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { t.Fatalf("expected new indexDB generation %d to be different from prev indexDB", dbNew.generation) } - // Re-insert rows again and verify that entries belong prevGeneration and dbNew.generation, - // while the majority of entries remain at prevGeneration. + // Re-insert rows again and verify that all the entries belong to new generation if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { t.Fatalf("unexpected error when adding mrs: %s", err) } s.DebugFlush() - entriesByGeneration := make(map[uint64]int) + for _, mr := range mrs { s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) - entriesByGeneration[genTSID.generation]++ + if genTSID.generation != dbNew.generation { + t.Fatalf("unexpected generation for data after rotation; got %d; want %d", genTSID.generation, dbNew.generation) + } } - if len(entriesByGeneration) > 2 { - t.Fatalf("expecting two generations; got %d", entriesByGeneration) - } - prevEntries := entriesByGeneration[prevGeneration] - currEntries := entriesByGeneration[dbNew.generation] - totalEntries := prevEntries + currEntries - if totalEntries != metricRowsN { - t.Fatalf("unexpected number of entries in tsid cache; got %d; want %d", totalEntries, metricRowsN) - } - if float64(currEntries)/float64(totalEntries) > 0.1 { - t.Fatalf("too big share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries) + + s.MustClose() + if err := os.RemoveAll(path); err != nil { + t.Fatalf("cannot remove %q: %s", path, err) } } func TestSearchTSIDWithTimeRange(t *testing.T) { - s := newTestStorage() - defer stopTestStorage(s) - - dbName := nextIndexDBTableName() - var isReadOnly uint32 - db := mustOpenIndexDB(dbName, s, 0, &isReadOnly) - defer func() { - db.MustClose() - if err := os.RemoveAll(dbName); err != nil { - t.Fatalf("cannot remove indexDB: %s", err) - } - }() + const path = "TestSearchTSIDWithTimeRange" + s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + db := s.idb() // Create a bunch of per-day time series const accountID = 12345 const projectID = 85453 is := db.getIndexSearch(accountID, projectID, noDeadline) - defer db.putIndexSearch(is) const days = 5 const metricsPerDay = 1000 theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC) @@ -1687,8 +1649,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { } sort.Strings(labelNames) for day := 0; day < days; day++ { - var tsids []TSID - var mns []MetricName + date := baseDate - uint64(day) + var metricIDs uint64set.Set for metric := 0; metric < metricsPerDay; metric++ { var mn MetricName mn.AccountID = accountID @@ -1714,37 +1676,30 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { metricNameBuf = mn.Marshal(metricNameBuf[:0]) metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0]) - var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil { - t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err) + var genTSID generationTSID + if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { + generateTSID(&genTSID.TSID, &mn) + genTSID.generation = db.generation + db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date) } - if tsid.AccountID != accountID { - t.Fatalf("unexpected accountID; got %d; want %d", tsid.AccountID, accountID) + if genTSID.TSID.AccountID != accountID { + t.Fatalf("unexpected accountID; got %d; want %d", genTSID.TSID.AccountID, accountID) } - if tsid.ProjectID != projectID { - t.Fatalf("unexpected accountID; got %d; want %d", tsid.ProjectID, projectID) + if genTSID.TSID.ProjectID != projectID { + t.Fatalf("unexpected accountID; got %d; want %d", genTSID.TSID.ProjectID, projectID) } - mns = append(mns, mn) - tsids = append(tsids, tsid) + metricIDs.Add(genTSID.TSID.MetricID) } - // Add the metrics to the per-day stores - date := baseDate - uint64(day) - var metricIDs uint64set.Set - for i := range tsids { - tsid := &tsids[i] - metricIDs.Add(tsid.MetricID) - is.createPerDayIndexes(date, tsid.MetricID, &mns[i]) - } allMetricIDs.Union(&metricIDs) perDayMetricIDs[date] = &metricIDs } + db.putIndexSearch(is) // Flush index to disk, so it becomes visible for search - db.tb.DebugFlush() + s.DebugFlush() is2 := db.getIndexSearch(accountID, projectID, noDeadline) - defer db.putIndexSearch(is2) // Check that all the metrics are found for all the days. for date := baseDate - days + 1; date <= baseDate; date++ { @@ -1765,6 +1720,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { if !allMetricIDs.Equal(metricIDs) { t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil)) } + db.putIndexSearch(is2) // Check SearchLabelNamesWithFiltersOnTimeRange with the specified time range. tr := TimeRange{ @@ -2110,6 +2066,9 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { if status.TotalLabelValuePairs != expectedLabelValuePairs { t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs) } + + s.MustClose() + fs.MustRemoveAll(path) } func toTFPointers(tfs []tagFilter) []*tagFilter { @@ -2131,8 +2090,6 @@ func newTestStorage() *Storage { retentionMsecs: maxRetentionMsecs, } s.setDeletedMetricIDs(&uint64set.Set{}) - var idb *indexDB - s.idbCurr.Store(idb) return s } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 4b95960ba..a05bd3b3d 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -2,12 +2,13 @@ package storage import ( "fmt" - "os" "regexp" "strconv" "sync/atomic" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" ) func BenchmarkRegexpFilterMatch(b *testing.B) { @@ -39,21 +40,12 @@ func BenchmarkRegexpFilterMismatch(b *testing.B) { } func BenchmarkIndexDBAddTSIDs(b *testing.B) { + const path = "BenchmarkIndexDBAddTSIDs" + s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + db := s.idb() + const recordsPerLoop = 1e3 - s := newTestStorage() - defer stopTestStorage(s) - - dbName := nextIndexDBTableName() - var isReadOnly uint32 - db := mustOpenIndexDB(dbName, s, 0, &isReadOnly) - defer func() { - db.MustClose() - if err := os.RemoveAll(dbName); err != nil { - b.Fatalf("cannot remove indexDB: %s", err) - } - }() - var goroutineID uint32 b.ReportAllocs() @@ -61,7 +53,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { b.ResetTimer() b.RunParallel(func(pb *testing.PB) { var mn MetricName - var tsid TSID + var genTSID generationTSID mn.AccountID = atomic.AddUint32(&goroutineID, 1) // The most common tags. @@ -76,15 +68,18 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { startOffset := 0 for pb.Next() { - benchmarkIndexDBAddTSIDs(db, &tsid, &mn, startOffset, recordsPerLoop) + benchmarkIndexDBAddTSIDs(db, &genTSID, &mn, startOffset, recordsPerLoop) startOffset += recordsPerLoop } }) b.StopTimer() + + s.MustClose() + fs.MustRemoveAll(path) } -func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) { - var metricName []byte +func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) { + date := uint64(0) var metricNameRaw []byte is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) @@ -94,39 +89,30 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs mn.Tags[j].Value = strconv.AppendUint(mn.Tags[j].Value[:0], uint64(i*j), 16) } mn.sortTags() - metricName = mn.Marshal(metricName[:0]) + metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) - if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil { - panic(fmt.Errorf("cannot insert record: %w", err)) - } + generateTSID(&genTSID.TSID, mn) + genTSID.generation = db.generation + db.s.createAllIndexesForMetricName(is, mn, metricNameRaw, genTSID, date) } } func BenchmarkHeadPostingForMatchers(b *testing.B) { // This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52 // See https://www.robustperception.io/evaluating-performance-and-correctness for more details. - s := newTestStorage() - defer stopTestStorage(s) - - dbName := nextIndexDBTableName() - var isReadOnly uint32 - db := mustOpenIndexDB(dbName, s, 0, &isReadOnly) - defer func() { - db.MustClose() - if err := os.RemoveAll(dbName); err != nil { - b.Fatalf("cannot remove indexDB: %s", err) - } - }() + const path = "BenchmarkHeadPostingForMatchers" + s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + db := s.idb() // Fill the db with data as in https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L66 const accountID = 34327843 const projectID = 893433 - var mn MetricName - var metricName []byte - var metricNameRaw []byte - var tsid TSID is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) + var mn MetricName + var metricNameRaw []byte + var genTSID generationTSID + date := uint64(0) addSeries := func(kvs ...string) { mn.Reset() for i := 0; i < len(kvs); i += 2 { @@ -135,11 +121,10 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { mn.sortTags() mn.AccountID = accountID mn.ProjectID = projectID - metricName = mn.Marshal(metricName[:0]) metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) - if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil { - b.Fatalf("cannot insert record: %s", err) - } + generateTSID(&genTSID.TSID, &mn) + genTSID.generation = db.generation + db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date) } for n := 0; n < 10; n++ { ns := strconv.Itoa(n) @@ -155,12 +140,11 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { } // Make sure all the items can be searched. - db.tb.DebugFlush() + db.s.DebugFlush() b.ResetTimer() benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) { is := db.getIndexSearch(tfs.accountID, tfs.projectID, noDeadline) - defer db.putIndexSearch(is) tfss := []*TagFilters{tfs} tr := TimeRange{ MinTimestamp: 0, @@ -175,6 +159,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { b.Fatalf("unexpected metricIDs found; got %d; want %d", len(metricIDs), expectedMetricIDs) } } + db.putIndexSearch(is) } addTagFilter := func(tfs *TagFilters, key, value string, isNegative, isRegexp bool) { if err := tfs.Add([]byte(key), []byte(value), isNegative, isRegexp); err != nil { @@ -283,21 +268,15 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { addTagFilter(tfs, "j", "foo", false, false) benchSearch(b, tfs, 88889) }) + + s.MustClose() + fs.MustRemoveAll(path) } func BenchmarkIndexDBGetTSIDs(b *testing.B) { - s := newTestStorage() - defer stopTestStorage(s) - - dbName := nextIndexDBTableName() - var isReadOnly uint32 - db := mustOpenIndexDB(dbName, s, 0, &isReadOnly) - defer func() { - db.MustClose() - if err := os.RemoveAll(dbName); err != nil { - b.Fatalf("cannot remove indexDB: %s", err) - } - }() + const path = "BenchmarkIndexDBGetTSIDs" + s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + db := s.idb() const recordsPerLoop = 1000 const accountsCount = 111 @@ -312,33 +291,33 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { value := fmt.Sprintf("value_%d", i) mn.AddTag(key, value) } - var tsid TSID - var metricName []byte + var genTSID generationTSID var metricNameRaw []byte + date := uint64(0) is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) + for i := 0; i < recordsCount; i++ { mn.AccountID = uint32(i % accountsCount) mn.ProjectID = uint32(i % projectsCount) mn.sortTags() - metricName = mn.Marshal(metricName[:0]) - metricNameRaw = mn.marshalRaw(metricName[:0]) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil { - b.Fatalf("cannot insert record: %s", err) - } + metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) + generateTSID(&genTSID.TSID, &mn) + genTSID.generation = db.generation + db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date) } + db.s.DebugFlush() b.SetBytes(recordsPerLoop) b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { - var tsidLocal TSID + var genTSIDLocal generationTSID var metricNameLocal []byte var metricNameLocalRaw []byte mnLocal := mn is := db.getIndexSearch(0, 0, noDeadline) - defer db.putIndexSearch(is) for pb.Next() { for i := 0; i < recordsPerLoop; i++ { mnLocal.AccountID = uint32(i % accountsCount) @@ -346,11 +325,15 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { mnLocal.sortTags() metricNameLocal = mnLocal.Marshal(metricNameLocal[:0]) metricNameLocalRaw = mnLocal.marshalRaw(metricNameLocalRaw[:0]) - if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, metricNameLocalRaw, 0); err != nil { - panic(fmt.Errorf("cannot obtain tsid: %w", err)) + if !is.getTSIDByMetricName(&genTSIDLocal, metricNameLocal, date) { + panic(fmt.Errorf("cannot obtain tsid for row %d", i)) } } } + db.putIndexSearch(is) }) b.StopTimer() + + s.MustClose() + fs.MustRemoveAll(path) } diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 86d5bd81c..2f2d28527 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -172,14 +172,6 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt := mustCreatePartition(ptt, "small-table", "big-table", strg) smallPartsPath := pt.smallPartsPath bigPartsPath := pt.bigPartsPath - defer func() { - if err := os.RemoveAll("small-table"); err != nil { - t.Fatalf("cannot remove small parts directory: %s", err) - } - if err := os.RemoveAll("big-table"); err != nil { - t.Fatalf("cannot remove big parts directory: %s", err) - } - }() var tmpRows []rawRow for _, rows := range rowss { pt.AddRows(rows) @@ -194,6 +186,13 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt = mustOpenPartition(smallPartsPath, bigPartsPath, strg) testPartitionSearch(t, pt, tsids, tr, rbsExpected, rowsCountExpected) pt.MustClose() + + if err := os.RemoveAll("small-table"); err != nil { + t.Fatalf("cannot remove small parts directory: %s", err) + } + if err := os.RemoveAll("big-table"); err != nil { + t.Fatalf("cannot remove big parts directory: %s", err) + } } func testPartitionSearch(t *testing.T, pt *partition, tsids []TSID, tr TimeRange, rbsExpected []rawBlock, rowsCountExpected int64) { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index b720af701..7af78760d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2,7 +2,6 @@ package storage import ( "bytes" - "errors" "fmt" "io" "math" @@ -45,6 +44,8 @@ type Storage struct { tooSmallTimestampRows uint64 tooBigTimestampRows uint64 + timeseriesRepopulated uint64 + newTimeseriesCreated uint64 slowRowInserts uint64 slowPerDayIndexInserts uint64 slowMetricNameLoads uint64 @@ -308,7 +309,11 @@ func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) { // since it may slow down data ingestion when used frequently. func (s *Storage) DebugFlush() { s.tb.flushPendingRows() - s.idb().tb.DebugFlush() + idb := s.idb() + idb.tb.DebugFlush() + idb.doExtDB(func(extDB *indexDB) { + extDB.tb.DebugFlush() + }) } // CreateSnapshot creates snapshot for s and returns the snapshot name. @@ -455,6 +460,8 @@ type Metrics struct { TooSmallTimestampRows uint64 TooBigTimestampRows uint64 + TimeseriesRepopulated uint64 + NewTimeseriesCreated uint64 SlowRowInserts uint64 SlowPerDayIndexInserts uint64 SlowMetricNameLoads uint64 @@ -524,6 +531,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows) m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows) + m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated) + m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated) m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) @@ -738,8 +747,9 @@ func (s *Storage) mustRotateIndexDB() { // Persist changes on the file system. fs.MustSyncPath(s.path) - // Do not flush tsidCache to avoid read/write path slowdown - // and slowly re-populate new idb with entries from the cache via maybeCreateIndexes(). + // Do not flush tsidCache to avoid read/write path slowdown. + // The cache is automatically re-populated with new TSID entries + // with the updated indexdb generation. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 // Flush metric id caches for the current and the previous hour, @@ -753,7 +763,7 @@ func (s *Storage) mustRotateIndexDB() { // These series are already registered in prevHourMetricIDs, so VM doesn't add per-day entries to the current indexdb. // 4. Stop adding new samples for these series just before 5 UTC. // 5. The next indexdb rotation is performed at 4 UTC next day. - // The information about the series from step 5 disappears from indexdb, since the old indexdb from step 1 is deleted, + // The information about the series from step 3 disappears from indexdb, since the old indexdb from step 1 is deleted, // while the current indexdb doesn't contain information about the series. // So queries for the last 24 hours stop returning samples added at step 3. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698 @@ -830,7 +840,6 @@ func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry { name := "next_day_metric_ids" path := filepath.Join(s.cachePath, name) if !fs.IsPathExist(path) { - logger.Infof("nothing to load from %q", path) return e } src, err := os.ReadFile(path) @@ -870,7 +879,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs } path := filepath.Join(s.cachePath, name) if !fs.IsPathExist(path) { - logger.Infof("nothing to load from %q", path) return hm } src, err := os.ReadFile(path) @@ -1243,8 +1251,7 @@ func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int, if err != nil { return deletedCount, fmt.Errorf("cannot delete tsids: %w", err) } - // Do not reset MetricName->TSID cache in order to prevent from adding new data points - // to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs. + // Do not reset MetricName->TSID cache, since it is already reset inside DeleteTSIDs. // Do not reset MetricID->MetricName cache, since it must be used only // after filtering out deleted metricIDs. @@ -1639,61 +1646,129 @@ var metricRowsInsertCtxPool sync.Pool const maxMetricRowsPerBlock = 8000 -// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later. +// RegisterMetricNames registers all the metric names from mrs in the indexdb, so they can be queried later. // -// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp. +// The the MetricRow.Timestamp is used for registering the metric name at the given day according to the timestamp. // Th MetricRow.Value field is ignored. -func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error { +func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { qt = qt.NewChild("registering %d series", len(mrs)) defer qt.Done() - var metricName []byte + var metricNameBuf []byte var genTSID generationTSID mn := GetMetricName() defer PutMetricName(mn) + var seriesRepopulated uint64 + idb := s.idb() is := idb.getIndexSearch(0, 0, noDeadline) defer idb.putIndexSearch(is) + var firstWarn error for i := range mrs { mr := &mrs[i] + date := uint64(mr.Timestamp) / msecPerDay if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { - if err := s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw); err != nil { + // Fast path - mr.MetricNameRaw has been already registered in the current idb. + if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) { + // Skip row, since it exceeds cardinality limit continue } - if genTSID.generation == idb.generation { - // Fast path - mr.MetricNameRaw has been already registered in the current idb. - continue + if genTSID.generation != idb.generation { + // The found TSID is from the previous indexdb. Create it in the current indexdb. + + if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { + // Do not stop adding rows on error - just skip invalid row. + // This guarantees that invalid rows don't prevent + // from adding valid rows into the storage. + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + continue + } + mn.sortTags() + + genTSID.generation = idb.generation + s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + seriesRepopulated++ } + continue } - // Slow path - register mr.MetricNameRaw. + + // Slow path - search TSID for the given metricName in indexdb. + + // Construct canonical metric name - it is used below. if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { - return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + // Do not stop adding rows on error - just skip invalid row. + // This guarantees that invalid rows don't prevent + // from adding valid rows into the storage. + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + continue } mn.sortTags() - metricName = mn.Marshal(metricName[:0]) - date := uint64(mr.Timestamp) / msecPerDay - if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil { - if errors.Is(err, errSeriesCardinalityExceeded) { + metricNameBuf = mn.Marshal(metricNameBuf[:0]) + + if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { + // Slower path - the TSID has been found in indexdb. + + if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) { + // Skip the row, since it exceeds the configured cardinality limit. continue } - return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err) + + if genTSID.generation != idb.generation { + // The found TSID is from the previous indexdb. Create it in the current indexdb. + genTSID.generation = idb.generation + s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + seriesRepopulated++ + } else { + // Store the found TSID in the cache, so future rows for that TSID are ingested via fast path. + s.putTSIDToCache(&genTSID, mr.MetricNameRaw) + } + continue } + + // Slowest path - there is no TSID in indexdb for the given mr.MetricNameRaw. Create it. + generateTSID(&genTSID.TSID, mn) + + if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) { + // Skip the row, since it exceeds the configured cardinality limit. + continue + } + + // Schedule creating TSID indexes instead of creating them synchronously. + // This should keep stable the ingestion rate when new time series are ingested. genTSID.generation = idb.generation - s.putTSIDToCache(&genTSID, mr.MetricNameRaw) + s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + } + + atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) + + if firstWarn != nil { + logger.Warnf("cannot create some metrics: %s", firstWarn) } - return nil } func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error { idb := s.idb() is := idb.getIndexSearch(0, 0, noDeadline) defer idb.putIndexSearch(is) + + mn := GetMetricName() + defer PutMetricName(mn) + var ( // These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName. prevTSID TSID prevMetricNameRaw []byte ) - var pmrs *pendingMetricRows + var metricNameBuf []byte + + var slowInsertsCount uint64 + var newSeriesCount uint64 + var seriesRepopulated uint64 + minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps() var genTSID generationTSID @@ -1737,6 +1812,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci r.Timestamp = mr.Timestamp r.Value = mr.Value r.PrecisionBits = precisionBits + + // Search for TSID for the given mr.MetricNameRaw and store it at r.TSID. if string(mr.MetricNameRaw) == string(prevMetricNameRaw) { // Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID. // This path should trigger on bulk imports when many rows contain the same MetricNameRaw. @@ -1744,99 +1821,109 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci continue } if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { - if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil { + // Fast path - the TSID for the given mr.MetricNameRaw has been found in cache and isn't deleted. + // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't + // contain MetricName->TSID entries for deleted time series. + // See Storage.DeleteSeries code for details. + + if !s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw) { + // Skip row, since it exceeds cardinality limit j-- continue } r.TSID = genTSID.TSID - // Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted. - // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't - // contain MetricName->TSID entries for deleted time series. - // See Storage.DeleteSeries code for details. prevTSID = r.TSID prevMetricNameRaw = mr.MetricNameRaw if genTSID.generation != idb.generation { - // The found entry is from the previous cache generation, - // so attempt to re-populate the current generation with this entry. - // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 + // The found TSID is from the previous indexdb. Create it in the current indexdb. date := uint64(r.Timestamp) / msecPerDay - created, err := is.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw, date) - if err != nil { - return fmt.Errorf("cannot create indexes: %w", err) - } - if created { - genTSID.generation = idb.generation - s.putTSIDToCache(&genTSID, mr.MetricNameRaw) - } - } - continue - } - // Slow path - the TSID is missing in the cache. - // Postpone its search in the loop below. - j-- - if pmrs == nil { - pmrs = getPendingMetricRows() - } - if err := pmrs.addRow(mr); err != nil { - // Do not stop adding rows on error - just skip invalid row. - // This guarantees that invalid rows don't prevent - // from adding valid rows into the storage. - if firstWarn == nil { - firstWarn = err - } - continue - } - } - if pmrs != nil { - // Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below. - pendingMetricRows := pmrs.pmrs - sort.Slice(pendingMetricRows, func(i, j int) bool { - return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName) - }) - prevMetricNameRaw = nil - var slowInsertsCount uint64 - for i := range pendingMetricRows { - pmr := &pendingMetricRows[i] - mr := pmr.mr - dstMrs[j] = mr - r := &rows[j] - j++ - r.Timestamp = mr.Timestamp - r.Value = mr.Value - r.PrecisionBits = precisionBits - if string(mr.MetricNameRaw) == string(prevMetricNameRaw) { - // Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID. - // This path should trigger on bulk imports when many rows contain the same MetricNameRaw. - r.TSID = prevTSID - continue - } - slowInsertsCount++ - date := uint64(r.Timestamp) / msecPerDay - if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil { - j-- - if errors.Is(err, errSeriesCardinalityExceeded) { + if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + j-- continue } - // Do not stop adding rows on error - just skip invalid row. - // This guarantees that invalid rows don't prevent - // from adding valid rows into the storage. - if firstWarn == nil { - firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %w", pmr.MetricName, err) - } + mn.sortTags() + + genTSID.generation = idb.generation + s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + seriesRepopulated++ + slowInsertsCount++ + } + continue + } + + // Slow path - the TSID for the given mr.MetricNameRaw is missing in the cache. + slowInsertsCount++ + + date := uint64(r.Timestamp) / msecPerDay + + // Construct canonical metric name - it is used below. + if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) + } + j-- + continue + } + mn.sortTags() + metricNameBuf = mn.Marshal(metricNameBuf[:0]) + + // Search for TSID for the given mr.MetricNameRaw in the indexdb. + if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { + // Slower path - the TSID has been found in indexdb. + + if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) { + // Skip the row, since it exceeds the configured cardinality limit. + j-- continue } - genTSID.generation = idb.generation - genTSID.TSID = r.TSID - s.putTSIDToCache(&genTSID, mr.MetricNameRaw) - prevTSID = r.TSID + if genTSID.generation != idb.generation { + // The found TSID is from the previous indexdb. Create it in the current indexdb. + genTSID.generation = idb.generation + s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + seriesRepopulated++ + } else { + // Store the found TSID in the cache, so future rows for that TSID are ingested via fast path. + s.putTSIDToCache(&genTSID, mr.MetricNameRaw) + } + + r.TSID = genTSID.TSID + prevTSID = genTSID.TSID prevMetricNameRaw = mr.MetricNameRaw + continue + } + + // Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it. + generateTSID(&genTSID.TSID, mn) + + if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) { + // Skip the row, since it exceeds the configured cardinality limit. + j-- + continue + } + + genTSID.generation = idb.generation + s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + newSeriesCount++ + + r.TSID = genTSID.TSID + prevTSID = r.TSID + prevMetricNameRaw = mr.MetricNameRaw + + if logNewSeries { + logger.Infof("new series created: %s", mn.String()) } - putPendingMetricRows(pmrs) - atomic.AddUint64(&s.slowRowInserts, slowInsertsCount) } + + atomic.AddUint64(&s.slowRowInserts, slowInsertsCount) + atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount) + atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) + if firstWarn != nil { storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn) } @@ -1857,22 +1944,44 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second) -func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error { +// SetLogNewSeries updates new series logging. +// +// This function must be called before any calling any storage functions. +func SetLogNewSeries(ok bool) { + logNewSeries = ok +} + +var logNewSeries = false + +func (s *Storage) createAllIndexesForMetricName(is *indexSearch, mn *MetricName, metricNameRaw []byte, genTSID *generationTSID, date uint64) error { + is.createGlobalIndexes(&genTSID.TSID, mn) + is.createPerDayIndexes(date, &genTSID.TSID, mn) + + // Store the TSID for for the current indexdb into cache, + // so future rows for that TSID are ingested via fast path. + s.putTSIDToCache(genTSID, metricNameRaw) + + // Register the (date, metricID) entry in the cache, + // so next time the entry is found there instead of searching for it in the indexdb. + s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID) + + return nil +} + +func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool { if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) { atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1) logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems()) - return errSeriesCardinalityExceeded + return false } if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) { atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1) logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems()) - return errSeriesCardinalityExceeded + return false } - return nil + return true } -var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded") - func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) { select { case <-logSkippedSeriesTicker.C: @@ -1895,75 +2004,6 @@ func getUserReadableMetricName(metricNameRaw []byte) string { return mn.String() } -type pendingMetricRow struct { - MetricName []byte - mr *MetricRow -} - -type pendingMetricRows struct { - pmrs []pendingMetricRow - metricNamesBuf []byte - - lastMetricNameRaw []byte - lastMetricName []byte - mn MetricName -} - -func (pmrs *pendingMetricRows) reset() { - mrs := pmrs.pmrs - for i := range mrs { - pmr := &mrs[i] - pmr.MetricName = nil - pmr.mr = nil - } - pmrs.pmrs = mrs[:0] - pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0] - pmrs.lastMetricNameRaw = nil - pmrs.lastMetricName = nil - pmrs.mn.Reset() -} - -func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error { - // Do not spend CPU time on re-calculating canonical metricName during bulk import - // of many rows for the same metric. - if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) { - if err := pmrs.mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { - return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err) - } - pmrs.mn.sortTags() - metricNamesBufLen := len(pmrs.metricNamesBuf) - pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf) - pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:] - pmrs.lastMetricNameRaw = mr.MetricNameRaw - } - mrs := pmrs.pmrs - if cap(mrs) > len(mrs) { - mrs = mrs[:len(mrs)+1] - } else { - mrs = append(mrs, pendingMetricRow{}) - } - pmrs.pmrs = mrs - pmr := &mrs[len(mrs)-1] - pmr.MetricName = pmrs.lastMetricName - pmr.mr = mr - return nil -} - -func getPendingMetricRows() *pendingMetricRows { - v := pendingMetricRowsPool.Get() - if v == nil { - v = &pendingMetricRows{} - } - return v.(*pendingMetricRows) -} - -func putPendingMetricRows(pmrs *pendingMetricRows) { - pmrs.reset() - pendingMetricRowsPool.Put(pmrs) -} - -var pendingMetricRowsPool sync.Pool - func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { var date uint64 var hour uint64 @@ -1983,11 +2023,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { // pMin linearly increases from 0 to 1 during the last hour of the day. pMin := (float64(ts%(3600*24)) / 3600) - 23 type pendingDateMetricID struct { - date uint64 - metricID uint64 - accountID uint32 - projectID uint32 - mr *MetricRow + date uint64 + tsid *TSID + mr *MetricRow } var pendingDateMetricIDs []pendingDateMetricID var pendingNextDayMetricIDs []uint64 @@ -2007,7 +2045,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { prevDate = date prevMetricID = metricID if hour == hm.hour { - // The r belongs to the current hour. Check for the current hour cache. + // The row belongs to the current hour. Check for the current hour cache. if hm.m.Has(metricID) { // Fast path: the metricID is in the current hour cache. // This means the metricID has been already added to per-day inverted index. @@ -2020,11 +2058,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { p := float64(uint32(fastHashUint64(metricID))) / (1 << 32) if p < pMin && !nextDayMetricIDs.Has(metricID) { pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{ - date: date + 1, - metricID: metricID, - accountID: r.TSID.AccountID, - projectID: r.TSID.ProjectID, - mr: mrs[i], + date: date + 1, + tsid: &r.TSID, + mr: mrs[i], }) pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID) } @@ -2049,11 +2085,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { } // Slow path: store the (date, metricID) entry in the indexDB. pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{ - date: date, - metricID: metricID, - accountID: r.TSID.AccountID, - projectID: r.TSID.ProjectID, - mr: mrs[i], + date: date, + tsid: &r.TSID, + mr: mrs[i], }) } if len(pendingNextDayMetricIDs) > 0 { @@ -2067,7 +2101,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { s.pendingHourEntriesLock.Unlock() } if len(pendingDateMetricIDs) == 0 { - // Fast path - there are no new (date, metricID) entries in rows. + // Fast path - there are no new (date, metricID) entries. return nil } @@ -2078,37 +2112,31 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { sort.Slice(pendingDateMetricIDs, func(i, j int) bool { a := pendingDateMetricIDs[i] b := pendingDateMetricIDs[j] - if a.accountID != b.accountID { - return a.accountID < b.accountID + if a.tsid.AccountID != b.tsid.AccountID { + return a.tsid.AccountID < b.tsid.AccountID } - if a.projectID != b.projectID { - return a.projectID < b.projectID + if a.tsid.ProjectID != b.tsid.ProjectID { + return a.tsid.ProjectID < b.tsid.ProjectID } if a.date != b.date { return a.date < b.date } - return a.metricID < b.metricID + return a.tsid.MetricID < b.tsid.MetricID }) + idb := s.idb() is := idb.getIndexSearch(0, 0, noDeadline) defer idb.putIndexSearch(is) + var firstError error dateMetricIDsForCache := make([]dateMetricID, 0, len(pendingDateMetricIDs)) mn := GetMetricName() for _, dmid := range pendingDateMetricIDs { date := dmid.date - metricID := dmid.metricID - ok, err := is.hasDateMetricID(date, metricID, dmid.accountID, dmid.projectID) - if err != nil { - if firstError == nil { - firstError = fmt.Errorf("error when locating (date=%s, metricID=%d, accountID=%d, projectID=%d) in database: %w", - dateToString(date), metricID, dmid.accountID, dmid.projectID, err) - } - continue - } - if !ok { + metricID := dmid.tsid.MetricID + if !is.hasDateMetricIDNoExtDB(date, metricID, dmid.tsid.AccountID, dmid.tsid.ProjectID) { // The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes. - // It is OK if the (date, metricID) entry is added multiple times to db + // It is OK if the (date, metricID) entry is added multiple times to indexdb // by concurrent goroutines. if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil { if firstError == nil { @@ -2117,7 +2145,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { continue } mn.sortTags() - is.createPerDayIndexes(date, metricID, mn) + is.createPerDayIndexes(date, dmid.tsid, mn) } dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{ date: date, diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index dc0546948..23fb117f7 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -916,9 +916,7 @@ func testStorageRegisterMetricNames(s *Storage) error { } mrs = append(mrs, mr) } - if err := s.RegisterMetricNames(nil, mrs); err != nil { - return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err) - } + s.RegisterMetricNames(nil, mrs) } var addIDsExpected []string for k := range addIDsMap {