lib/storage: switch from global to per-day index for MetricName -> TSID mapping

Previously all the newly ingested time series were registered in global `MetricName -> TSID` index.
This index was used during data ingestion for locating the TSID (internal series id)
for the given canonical metric name (the canonical metric name consists of metric name plus all its labels sorted by label names).

The `MetricName -> TSID` index is stored on disk in order to make sure that the data
isn't lost on VictoriaMetrics restart or unclean shutdown.

The lookup in this index is relatively slow, since VictoriaMetrics needs to read the corresponding
data block from disk, unpack it, put the unpacked block into `indexdb/dataBlocks` cache,
and then search for the given `MetricName -> TSID` entry there. So VictoriaMetrics
uses in-memory cache for speeding up the lookup for active time series.
This cache is named `storage/tsid`. If this cache capacity is enough for all the currently ingested
active time series, then VictoriaMetrics works fast, since it doesn't need to read the data from disk.

VictoriaMetrics starts reading data from `MetricName -> TSID` on-disk index in the following cases:

- If `storage/tsid` cache capacity isn't enough for active time series.
  Then just increase available memory for VictoriaMetrics or reduce the number of active time series
  ingested into VictoriaMetrics.

- If new time series is ingested into VictoriaMetrics. In this case it cannot find
  the needed entry in the `storage/tsid` cache, so it needs to consult on-disk `MetricName -> TSID` index,
  since it doesn't know that the index has no the corresponding entry too.
  This is a typical event under high churn rate, when old time series are constantly substituted
  with new time series.

Reading the data from `MetricName -> TSID` index is slow, so inserts, which lead to reading this index,
are counted as slow inserts, and they can be monitored via `vm_slow_row_inserts_total` metric exposed by VictoriaMetrics.

Prior to this commit the `MetricName -> TSID` index was global, e.g. it contained entries sorted by `MetricName`
for all the time series ever ingested into VictoriaMetrics during the configured -retentionPeriod.
This index can become very large under high churn rate and long retention. VictoriaMetrics
caches data from this index in `indexdb/dataBlocks` in-memory cache for speeding up index lookups.
The `indexdb/dataBlocks` cache may occupy significant share of available memory for storing
recently accessed blocks at `MetricName -> TSID` index when searching for newly ingested time series.

This commit switches from global `MetricName -> TSID` index to per-day index. This allows significantly
reducing the amounts of data, which needs to be cached in `indexdb/dataBlocks`, since now VictoriaMetrics
consults only the index for the current day when new time series is ingested into it.

The downside of this change is increased indexdb size on disk for workloads without high churn rate,
e.g. with static time series, which do no change over time, since now VictoriaMetrics needs to store
identical `MetricName -> TSID` entries for static time series for every day.

This change removes an optimization for reducing CPU and disk IO spikes at indexdb rotation,
since it didn't work correctly - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 .

At the same time the change fixes the issue, which could result in lost access to time series,
which stop receving new samples during the first hour after indexdb rotation - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698

The issue with the increased CPU and disk IO usage during indexdb rotation will be addressed
in a separate commit according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401#issuecomment-1553488685

This is a follow-up for 1f28b46ae9
This commit is contained in:
Aliaksandr Valialkin 2023-07-13 15:33:41 -07:00
parent 3b50b94f7a
commit 7094fa38bc
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
9 changed files with 444 additions and 604 deletions

View file

@ -138,9 +138,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
mr.Timestamp = ct
}
if err := vmstorage.RegisterMetricNames(nil, mrs); err != nil {
return fmt.Errorf("cannot register paths: %w", err)
}
vmstorage.RegisterMetricNames(nil, mrs)
// Return response
contentType := "text/plain; charset=utf-8"

View file

@ -155,11 +155,10 @@ func AddRows(mrs []storage.MetricRow) error {
var errReadOnly = errors.New("the storage is in read-only mode; check -storage.minFreeDiskSpaceBytes command-line flag value")
// RegisterMetricNames registers all the metrics from mrs in the storage.
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) error {
func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow) {
WG.Add(1)
err := Storage.RegisterMetricNames(qt, mrs)
Storage.RegisterMetricNames(qt, mrs)
WG.Done()
return err
}
// DeleteSeries deletes series matching tfss.
@ -546,12 +545,6 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(idbm().PartsRefCount)
})
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
return float64(idbm().NewTimeseriesCreated)
})
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
return float64(idbm().TimeseriesRepopulated)
})
metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 {
return float64(idbm().MissingTSIDsForMetricID)
})
@ -666,6 +659,12 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().TooSmallTimestampRows)
})
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
return float64(m().TimeseriesRepopulated)
})
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
return float64(m().NewTimeseriesCreated)
})
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
return float64(m().SlowRowInserts)
})

View file

@ -27,7 +27,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
* FETURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). The change significantly reduces the size of `indexdb/dataBlocks` cache for such setups. The cache size can be [monitored](https://docs.victoriametrics.com/#monitoring) via `vm_cache_size_bytes{type="indexdb/dataBlocks"}` metric.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): update backoff policy on retries to reduce probability of overloading for `source` or `destination` databases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4402).

View file

@ -31,7 +31,17 @@ import (
const (
// Prefix for MetricName->TSID entries.
nsPrefixMetricNameToTSID = 0
//
// This index was substituted with nsPrefixDateMetricNameToTSID,
// since the MetricName->TSID index may require big amounts of memory for indexdb/dataBlocks cache
// when it grows big on the configured retention under high churn rate
// (e.g. when new time series are constantly registered).
//
// It is much more efficient from memory usage PoV to query per-day MetricName->TSID index
// (aka nsPrefixDateMetricNameToTSID) when the TSID must be obtained for the given MetricName
// during data ingestion under high churn rate and big retention.
//
// nsPrefixMetricNameToTSID = 0
// Prefix for Tag->MetricID entries.
nsPrefixTagToMetricIDs = 1
@ -50,6 +60,9 @@ const (
// Prefix for (Date,Tag)->MetricID entries.
nsPrefixDateTagToMetricIDs = 6
// Prefix for (Date,MetricName)->TSID entries.
nsPrefixDateMetricNameToTSID = 7
)
// indexDB represents an index db.
@ -59,12 +72,6 @@ type indexDB struct {
refCount uint64
// The counter for newly created time series. It can be used for determining time series churn rate.
newTimeseriesCreated uint64
// The counter for time series which were re-populated from previous indexDB after the rotation.
timeseriesRepopulated uint64
// The number of missing MetricID -> TSID entries.
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
@ -177,8 +184,6 @@ type IndexDBMetrics struct {
IndexDBRefCount uint64
NewTimeseriesCreated uint64
TimeseriesRepopulated uint64
MissingTSIDsForMetricID uint64
RecentHourMetricIDsSearchCalls uint64
@ -219,8 +224,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
m.TimeseriesRepopulated += atomic.LoadUint64(&db.timeseriesRepopulated)
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
@ -377,33 +380,6 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
db.s.metricNameCache.Set(key[:], metricName)
}
// maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db.
//
// The probability increases from 0 to 100% during the first hour since db rotation.
//
// It returns true if new index entry was created, and false if it was skipped.
func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) {
pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600
if pMin < 1 {
p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
if p > pMin {
// Fast path: there is no need creating indexes for metricNameRaw yet.
return false, nil
}
}
// Slow path: create indexes for (tsid, metricNameRaw) at db.
mn := GetMetricName()
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
}
mn.sortTags()
is.createGlobalIndexes(tsid, mn)
is.createPerDayIndexes(date, tsid.MetricID, mn)
PutMetricName(mn)
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
return true, nil
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
prefix := ^uint64(0)
if versioned {
@ -473,25 +449,28 @@ func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
return dst, nil
}
// getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date.
//
// It returns io.EOF if the given mn isn't found locally.
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
is := db.getIndexSearch(noDeadline)
err := is.getTSIDByMetricName(dst, metricName)
db.putIndexSearch(is)
if err == nil {
return nil
}
if err != io.EOF {
return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)
// It returns false if the given metricName isn't found in the indexdb.
func (is *indexSearch) getTSIDByMetricName(dst *generationTSID, metricName []byte, date uint64) bool {
if is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date) {
// Fast path - the TSID is found in the current indexdb.
dst.generation = is.db.generation
return true
}
// Do not search for the TSID in the external storage,
// since this function is already called by another indexDB instance.
// The TSID for the given mn wasn't found.
return io.EOF
// Slow path - search for the TSID in the previous indexdb
ok := false
deadline := is.deadline
is.db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
ok = is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date)
extDB.putIndexSearch(is)
if ok {
dst.generation = extDB.generation
}
})
return ok
}
type indexSearch struct {
@ -502,55 +481,6 @@ type indexSearch struct {
// deadline in unix timestamp seconds for the given search.
deadline uint64
// tsidByNameMisses and tsidByNameSkips is used for a performance
// hack in GetOrCreateTSIDByName. See the comment there.
tsidByNameMisses int
tsidByNameSkips int
}
// GetOrCreateTSIDByName fills the dst with TSID for the given metricName.
//
// It also registers the metricName in global and per-day indexes
// for the given date if the metricName->TSID entry is missing in the index.
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
// A hack: skip searching for the TSID after many serial misses.
// This should improve insertion performance for big batches
// of new time series.
if is.tsidByNameMisses < 100 {
err := is.getTSIDByMetricName(dst, metricName)
if err == nil {
// Fast path - the TSID for the given metricName has been found in the index.
is.tsidByNameMisses = 0
if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
// There is no need in checking whether the TSID is present in the per-day index for the given date,
// since this check must be performed by the caller in an optimized way.
// See storage.updatePerDateData() function.
return nil
}
if err != io.EOF {
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
return fmt.Errorf("cannot search TSID by MetricName %s: %w", userReadableMetricName, err)
}
is.tsidByNameMisses++
} else {
is.tsidByNameSkips++
if is.tsidByNameSkips > 10000 {
is.tsidByNameSkips = 0
is.tsidByNameMisses = 0
}
}
// TSID for the given name wasn't found. Create it.
// It is OK if duplicate TSID for mn is created by concurrent goroutines.
// Metric results will be merged by mn after TableSearch.
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
}
return nil
}
func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
@ -572,75 +502,9 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
is.mp.Reset()
is.deadline = 0
// Do not reset tsidByNameMisses and tsidByNameSkips,
// since they are used in GetOrCreateTSIDByName across call boundaries.
db.indexSearchPool.Put(is)
}
func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
mn := GetMetricName()
defer PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
}
created, err := is.db.getOrCreateTSID(dst, metricName, mn)
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
is.createGlobalIndexes(dst, mn)
is.createPerDayIndexes(date, dst.MetricID, mn)
// There is no need in invalidating tag cache, since it is invalidated
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable.
if created {
// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
}
}
return nil
}
// SetLogNewSeries updates new series logging.
//
// This function must be called before any calling any storage functions.
func SetLogNewSeries(ok bool) {
logNewSeries = ok
}
var logNewSeries = false
// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found.
//
// Returns true if TSID was created or false if TSID was in extDB
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {
// Search the TSID in the external storage.
// This is usually the db from the previous period.
var err error
if db.doExtDB(func(extDB *indexDB) {
err = extDB.getTSIDByNameNoCreate(dst, metricName)
}) {
if err == nil {
// The TSID has been found in the external storage.
return false, nil
}
if err != io.EOF {
return false, fmt.Errorf("external search failed: %w", err)
}
}
// The TSID wasn't found in the external storage.
// Generate it locally.
generateTSID(dst, mn)
return true, nil
}
func generateTSID(dst *TSID, mn *MetricName) {
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
// Assume that the job-like metric is put at mn.Tags[0], while instance-like metric is put at mn.Tags[1]
@ -664,13 +528,6 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
ii := getIndexItems()
defer putIndexItems(ii)
// Create MetricName -> TSID index.
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create MetricID -> MetricName index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
@ -1935,26 +1792,27 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
func (is *indexSearch) getTSIDByMetricNameNoExtDB(dst *TSID, metricName []byte, date uint64) bool {
dmis := is.db.s.getDeletedMetricIDs()
ts := &is.ts
kb := &is.kb
kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateMetricNameToTSID)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = append(kb.B, metricName...)
kb.B = append(kb.B, kvSeparatorChar)
ts.Seek(kb.B)
for ts.NextItem() {
if !bytes.HasPrefix(ts.Item, kb.B) {
// Nothing found.
return io.EOF
return false
}
v := ts.Item[len(kb.B):]
tail, err := dst.Unmarshal(v)
if err != nil {
return fmt.Errorf("cannot unmarshal TSID: %w", err)
logger.Panicf("FATAL: cannot unmarshal TSID: %s", err)
}
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling TSID: %X", tail)
}
if dmis.Len() > 0 {
// Verify whether the dst is marked as deleted.
@ -1964,13 +1822,13 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
}
}
// Found valid dst.
return nil
return true
}
if err := ts.Error(); err != nil {
return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err)
logger.Panicf("FATAL: error when searching TSID by metricName; searchPrefix %q: %s", kb.B, err)
}
// Nothing found
return io.EOF
return false
}
func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
@ -2821,13 +2679,21 @@ const (
int64Max = int64((1 << 63) - 1)
)
func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) {
func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricName) {
ii := getIndexItems()
defer putIndexItems(ii)
ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID)
ii.B = encoding.MarshalUint64(ii.B, date)
ii.B = encoding.MarshalUint64(ii.B, metricID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.Next()
// Create per-day inverted index entries for TSID.
ii.B = marshalCommonPrefix(ii.B, nsPrefixDateMetricNameToTSID)
ii.B = encoding.MarshalUint64(ii.B, date)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create per-day inverted index entries for metricID.
@ -2835,9 +2701,8 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName
defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
ii.registerTagIndexes(kb.B, mn, metricID)
ii.registerTagIndexes(kb.B, mn, tsid.MetricID)
is.db.tb.AddItems(ii.Items)
is.db.s.dateMetricIDCache.Set(date, metricID)
}
func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {
@ -2947,22 +2812,24 @@ func reverseBytes(dst, src []byte) []byte {
return dst
}
func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
func (is *indexSearch) hasDateMetricIDNoExtDB(date, metricID uint64) bool {
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = encoding.MarshalUint64(kb.B, metricID)
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
if err == io.EOF {
return false, nil
err := ts.FirstItemWithPrefix(kb.B)
if err == nil {
if string(ts.Item) != string(kb.B) {
logger.Panicf("FATAL: unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
}
return false, fmt.Errorf("error when searching for (date=%s, metricID=%d) entry: %w", dateToString(date), metricID, err)
// Fast path - the (date, metricID) entry is found in the current indexdb.
return true
}
if string(ts.Item) != string(kb.B) {
return false, fmt.Errorf("unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
if err != io.EOF {
logger.Panicf("FATAL: unexpected error when searching for (date=%s, metricID=%d) entry: %s", dateToString(date), metricID, err)
}
return true, nil
return false
}
func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,

View file

@ -491,12 +491,11 @@ func TestRemoveDuplicateMetricIDs(t *testing.T) {
}
func TestIndexDBOpenClose(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
var s Storage
tableName := nextIndexDBTableName()
for i := 0; i < 5; i++ {
var isReadOnly uint32
db := mustOpenIndexDB(tableName, s, 0, &isReadOnly)
db := mustOpenIndexDB(tableName, &s, 0, &isReadOnly)
db.MustClose()
}
if err := os.RemoveAll(tableName); err != nil {
@ -508,19 +507,10 @@ func TestIndexDB(t *testing.T) {
const metricGroups = 10
t.Run("serial", func(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
var isReadOnly uint32
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
t.Fatalf("cannot remove indexDB: %s", err)
}
}()
const path = "TestIndexDB-serial"
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
db := s.idb()
mns, tsids, err := testIndexDBGetOrCreateTSIDByName(db, metricGroups)
if err != nil {
t.Fatalf("unexpected error: %s", err)
@ -529,27 +519,23 @@ func TestIndexDB(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
// Re-open the db and verify it works as expected.
db.MustClose()
db = mustOpenIndexDB(dbName, s, 0, &isReadOnly)
// Re-open the storage and verify it works as expected.
s.MustClose()
s = MustOpenStorage(path, maxRetentionMsecs, 0, 0)
db = s.idb()
if err := testIndexDBCheckTSIDByName(db, mns, tsids, false); err != nil {
t.Fatalf("unexpected error: %s", err)
}
s.MustClose()
fs.MustRemoveAll(path)
})
t.Run("concurrent", func(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
var isReadOnly uint32
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
t.Fatalf("cannot remove indexDB: %s", err)
}
}()
const path = "TestIndexDB-concurrent"
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
db := s.idb()
ch := make(chan error, 3)
for i := 0; i < cap(ch); i++ {
@ -566,20 +552,20 @@ func TestIndexDB(t *testing.T) {
ch <- nil
}()
}
var errors []error
deadlineCh := time.After(30 * time.Second)
for i := 0; i < cap(ch); i++ {
select {
case err := <-ch:
if err != nil {
errors = append(errors, fmt.Errorf("unexpected error: %w", err))
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(30 * time.Second):
case <-deadlineCh:
t.Fatalf("timeout")
}
}
if len(errors) > 0 {
t.Fatal(errors[0])
}
s.MustClose()
fs.MustRemoveAll(path)
})
}
@ -590,11 +576,12 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
var tsids []TSID
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
date := uint64(timestampFromTime(time.Now())) / msecPerDay
var metricNameBuf []byte
var metricNameRawBuf []byte
for i := 0; i < 4e2+1; i++ {
for i := 0; i < 401; i++ {
var mn MetricName
// Init MetricGroup.
@ -612,24 +599,20 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
// Create tsid for the metricName.
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
var genTSID generationTSID
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
}
mns = append(mns, mn)
tsids = append(tsids, tsid)
}
// fill Date -> MetricID cache
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
tsids = append(tsids, genTSID.TSID)
}
db.putIndexSearch(is)
// Flush index to disk, so it becomes visible for search
db.tb.DebugFlush()
db.s.DebugFlush()
return mns, tsids, nil
}
@ -644,8 +627,9 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return false
}
currentTime := timestampFromTime(time.Now())
timeseriesCounters := make(map[uint64]bool)
var tsidCopy TSID
var genTSID generationTSID
var metricNameCopy []byte
allLabelNames := make(map[string]bool)
for i := range mns {
@ -658,26 +642,29 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
mn.sortTags()
metricName := mn.Marshal(nil)
if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName); err != nil {
return fmt.Errorf("cannot obtain tsid #%d for mn %s: %w", i, mn, err)
is := db.getIndexSearch(noDeadline)
if !is.getTSIDByMetricName(&genTSID, metricName, uint64(currentTime)/msecPerDay) {
return fmt.Errorf("cannot obtain tsid #%d for mn %s", i, mn)
}
db.putIndexSearch(is)
if isConcurrent {
// Copy tsid.MetricID, since multiple TSIDs may match
// the same mn in concurrent mode.
tsidCopy.MetricID = tsid.MetricID
genTSID.TSID.MetricID = tsid.MetricID
}
if !reflect.DeepEqual(tsid, &tsidCopy) {
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &tsidCopy, tsid)
if !reflect.DeepEqual(tsid, &genTSID.TSID) {
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &genTSID.TSID, tsid)
}
// Search for metric name for the given metricID.
var err error
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], tsidCopy.MetricID)
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], genTSID.TSID.MetricID)
if err != nil {
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err)
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", genTSID.TSID.MetricID, i, err)
}
if !bytes.Equal(metricName, metricNameCopy) {
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName)
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", genTSID.TSID.MetricID, metricNameCopy, metricName)
}
// Try searching metric name for non-existent MetricID.
@ -738,7 +725,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Try tag filters.
currentTime := timestampFromTime(time.Now())
tr := TimeRange{
MinTimestamp: currentTime - msecPerDay,
MaxTimestamp: currentTime + msecPerDay,
@ -1447,12 +1433,6 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
r := rand.New(rand.NewSource(1))
path := "TestIndexRepopulateAfterRotation"
s := MustOpenStorage(path, msecsPerMonth, 1e5, 1e5)
defer func() {
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}()
db := s.idb()
if db.generation == 0 {
@ -1460,9 +1440,11 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
}
const metricRowsN = 1000
// use min-max timestamps of 1month range to create smaller number of partitions
timeMin, timeMax := time.Now().Add(-730*time.Hour), time.Now()
mrs := testGenerateMetricRows(r, metricRowsN, timeMin.UnixMilli(), timeMax.UnixMilli())
currentDayTimestamp := (time.Now().UnixMilli() / msecPerDay) * msecPerDay
timeMin := currentDayTimestamp - 24*3600*1000
timeMax := currentDayTimestamp + 24*3600*1000
mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
@ -1476,7 +1458,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
}
// check new series were registered in indexDB
added := atomic.LoadUint64(&db.newTimeseriesCreated)
added := atomic.LoadUint64(&db.s.newTimeseriesCreated)
if added != metricRowsN {
t.Fatalf("expected indexDB to contain %d rows; got %d", metricRowsN, added)
}
@ -1516,47 +1498,31 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
t.Fatalf("expected new indexDB generation %d to be different from prev indexDB", dbNew.generation)
}
// Re-insert rows again and verify that entries belong prevGeneration and dbNew.generation,
// while the majority of entries remain at prevGeneration.
// Re-insert rows again and verify that all the entries belong to new generation
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
s.DebugFlush()
entriesByGeneration := make(map[uint64]int)
for _, mr := range mrs {
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
entriesByGeneration[genTSID.generation]++
if genTSID.generation != dbNew.generation {
t.Fatalf("unexpected generation for data after rotation; got %d; want %d", genTSID.generation, dbNew.generation)
}
}
if len(entriesByGeneration) > 2 {
t.Fatalf("expecting two generations; got %d", entriesByGeneration)
}
prevEntries := entriesByGeneration[prevGeneration]
currEntries := entriesByGeneration[dbNew.generation]
totalEntries := prevEntries + currEntries
if totalEntries != metricRowsN {
t.Fatalf("unexpected number of entries in tsid cache; got %d; want %d", totalEntries, metricRowsN)
}
if float64(currEntries)/float64(totalEntries) > 0.1 {
t.Fatalf("too big share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries)
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func TestSearchTSIDWithTimeRange(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
var isReadOnly uint32
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
t.Fatalf("cannot remove indexDB: %s", err)
}
}()
const path = "TestSearchTSIDWithTimeRange"
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
db := s.idb()
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
// Create a bunch of per-day time series
const days = 5
@ -1576,8 +1542,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
sort.Strings(labelNames)
for day := 0; day < days; day++ {
var tsids []TSID
var mns []MetricName
date := baseDate - uint64(day)
var metricIDs uint64set.Set
for metric := 0; metric < metricsPerDay; metric++ {
var mn MetricName
mn.MetricGroup = []byte("testMetric")
@ -1601,31 +1567,24 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
var genTSID generationTSID
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
}
mns = append(mns, mn)
tsids = append(tsids, tsid)
metricIDs.Add(genTSID.TSID.MetricID)
}
// Add the metrics to the per-day stores
date := baseDate - uint64(day)
var metricIDs uint64set.Set
for i := range tsids {
tsid := &tsids[i]
metricIDs.Add(tsid.MetricID)
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
}
allMetricIDs.Union(&metricIDs)
perDayMetricIDs[date] = &metricIDs
}
db.putIndexSearch(is)
// Flush index to disk, so it becomes visible for search
db.tb.DebugFlush()
s.DebugFlush()
is2 := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is2)
// Check that all the metrics are found for all the days.
for date := baseDate - days + 1; date <= baseDate; date++ {
@ -1646,6 +1605,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if !allMetricIDs.Equal(metricIDs) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
}
db.putIndexSearch(is2)
// Check SearchLabelNamesWithFiltersOnTimeRange with the specified time range.
tr := TimeRange{
@ -1991,6 +1951,9 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
if status.TotalLabelValuePairs != expectedLabelValuePairs {
t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs)
}
s.MustClose()
fs.MustRemoveAll(path)
}
func toTFPointers(tfs []tagFilter) []*tagFilter {
@ -2012,8 +1975,6 @@ func newTestStorage() *Storage {
retentionMsecs: maxRetentionMsecs,
}
s.setDeletedMetricIDs(&uint64set.Set{})
var idb *indexDB
s.idbCurr.Store(idb)
return s
}

View file

@ -2,11 +2,12 @@ package storage
import (
"fmt"
"os"
"regexp"
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func BenchmarkRegexpFilterMatch(b *testing.B) {
@ -38,27 +39,18 @@ func BenchmarkRegexpFilterMismatch(b *testing.B) {
}
func BenchmarkIndexDBAddTSIDs(b *testing.B) {
const path = "BenchmarkIndexDBAddTSIDs"
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
db := s.idb()
const recordsPerLoop = 1e3
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
var isReadOnly uint32
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
b.Fatalf("cannot remove indexDB: %s", err)
}
}()
b.ReportAllocs()
b.SetBytes(recordsPerLoop)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var mn MetricName
var tsid TSID
var genTSID generationTSID
// The most common tags.
mn.Tags = []Tag{
@ -72,15 +64,18 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
startOffset := 0
for pb.Next() {
benchmarkIndexDBAddTSIDs(db, &tsid, &mn, startOffset, recordsPerLoop)
benchmarkIndexDBAddTSIDs(db, &genTSID, &mn, startOffset, recordsPerLoop)
startOffset += recordsPerLoop
}
})
b.StopTimer()
s.MustClose()
fs.MustRemoveAll(path)
}
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
var metricName []byte
func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) {
date := uint64(0)
var metricNameRaw []byte
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
@ -90,48 +85,38 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
mn.Tags[j].Value = strconv.AppendUint(mn.Tags[j].Value[:0], uint64(i*j), 16)
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil {
panic(fmt.Errorf("cannot insert record: %w", err))
}
generateTSID(&genTSID.TSID, mn)
genTSID.generation = db.generation
db.s.createAllIndexesForMetricName(is, mn, metricNameRaw, genTSID, date)
}
}
func BenchmarkHeadPostingForMatchers(b *testing.B) {
// This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52
// See https://www.robustperception.io/evaluating-performance-and-correctness for more details.
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
var isReadOnly uint32
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
b.Fatalf("cannot remove indexDB: %s", err)
}
}()
const path = "BenchmarkHeadPostingForMatchers"
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
db := s.idb()
// Fill the db with data as in https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L66
var mn MetricName
var metricName []byte
var metricNameRaw []byte
var tsid TSID
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
var mn MetricName
var metricNameRaw []byte
var genTSID generationTSID
date := uint64(0)
addSeries := func(kvs ...string) {
mn.Reset()
for i := 0; i < len(kvs); i += 2 {
mn.AddTag(kvs[i], kvs[i+1])
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
}
for n := 0; n < 10; n++ {
ns := strconv.Itoa(n)
@ -147,12 +132,11 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
}
// Make sure all the items can be searched.
db.tb.DebugFlush()
db.s.DebugFlush()
b.ResetTimer()
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
tfss := []*TagFilters{tfs}
tr := TimeRange{
MinTimestamp: 0,
@ -167,6 +151,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
b.Fatalf("unexpected metricIDs found; got %d; want %d", len(metricIDs), expectedMetricIDs)
}
}
db.putIndexSearch(is)
}
addTagFilter := func(tfs *TagFilters, key, value string, isNegative, isRegexp bool) {
if err := tfs.Add([]byte(key), []byte(value), isNegative, isRegexp); err != nil {
@ -275,21 +260,15 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
addTagFilter(tfs, "j", "foo", false, false)
benchSearch(b, tfs, 88889)
})
s.MustClose()
fs.MustRemoveAll(path)
}
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
s := newTestStorage()
defer stopTestStorage(s)
dbName := nextIndexDBTableName()
var isReadOnly uint32
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
defer func() {
db.MustClose()
if err := os.RemoveAll(dbName); err != nil {
b.Fatalf("cannot remove indexDB: %s", err)
}
}()
const path = "BenchmarkIndexDBGetTSIDs"
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
db := s.idb()
const recordsPerLoop = 1000
const recordsCount = 1e5
@ -302,41 +281,45 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
value := fmt.Sprintf("value_%d", i)
mn.AddTag(key, value)
}
var tsid TSID
var metricName []byte
var genTSID generationTSID
var metricNameRaw []byte
date := uint64(0)
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsCount; i++ {
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
metricNameRaw = mn.marshalRaw(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
}
db.s.DebugFlush()
b.SetBytes(recordsPerLoop)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var tsidLocal TSID
var genTSIDLocal generationTSID
var metricNameLocal []byte
var metricNameLocalRaw []byte
mnLocal := mn
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
for pb.Next() {
for i := 0; i < recordsPerLoop; i++ {
mnLocal.sortTags()
metricNameLocal = mnLocal.Marshal(metricNameLocal[:0])
metricNameLocalRaw = mnLocal.marshalRaw(metricNameLocalRaw[:0])
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, metricNameLocalRaw, 0); err != nil {
panic(fmt.Errorf("cannot obtain tsid: %w", err))
if !is.getTSIDByMetricName(&genTSIDLocal, metricNameLocal, date) {
panic(fmt.Errorf("cannot obtain tsid for row %d", i))
}
}
}
db.putIndexSearch(is)
})
b.StopTimer()
s.MustClose()
fs.MustRemoveAll(path)
}

View file

@ -172,14 +172,6 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt := mustCreatePartition(ptt, "small-table", "big-table", strg)
smallPartsPath := pt.smallPartsPath
bigPartsPath := pt.bigPartsPath
defer func() {
if err := os.RemoveAll("small-table"); err != nil {
t.Fatalf("cannot remove small parts directory: %s", err)
}
if err := os.RemoveAll("big-table"); err != nil {
t.Fatalf("cannot remove big parts directory: %s", err)
}
}()
var tmpRows []rawRow
for _, rows := range rowss {
pt.AddRows(rows)
@ -194,6 +186,13 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt = mustOpenPartition(smallPartsPath, bigPartsPath, strg)
testPartitionSearch(t, pt, tsids, tr, rbsExpected, rowsCountExpected)
pt.MustClose()
if err := os.RemoveAll("small-table"); err != nil {
t.Fatalf("cannot remove small parts directory: %s", err)
}
if err := os.RemoveAll("big-table"); err != nil {
t.Fatalf("cannot remove big parts directory: %s", err)
}
}
func testPartitionSearch(t *testing.T, pt *partition, tsids []TSID, tr TimeRange, rbsExpected []rawBlock, rowsCountExpected int64) {

View file

@ -2,7 +2,6 @@ package storage
import (
"bytes"
"errors"
"fmt"
"io"
"math"
@ -45,6 +44,8 @@ type Storage struct {
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
timeseriesRepopulated uint64
newTimeseriesCreated uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
slowMetricNameLoads uint64
@ -293,7 +294,11 @@ func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
// since it may slow down data ingestion when used frequently.
func (s *Storage) DebugFlush() {
s.tb.flushPendingRows()
s.idb().tb.DebugFlush()
idb := s.idb()
idb.tb.DebugFlush()
idb.doExtDB(func(extDB *indexDB) {
extDB.tb.DebugFlush()
})
}
// CreateSnapshot creates snapshot for s and returns the snapshot name.
@ -440,6 +445,8 @@ type Metrics struct {
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
TimeseriesRepopulated uint64
NewTimeseriesCreated uint64
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
SlowMetricNameLoads uint64
@ -509,6 +516,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated)
m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated)
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
@ -723,8 +732,9 @@ func (s *Storage) mustRotateIndexDB() {
// Persist changes on the file system.
fs.MustSyncPath(s.path)
// Do not flush tsidCache to avoid read/write path slowdown
// and slowly re-populate new idb with entries from the cache via maybeCreateIndexes().
// Do not flush tsidCache to avoid read/write path slowdown.
// The cache is automatically re-populated with new TSID entries
// with the updated indexdb generation.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
// Flush metric id caches for the current and the previous hour,
@ -738,7 +748,7 @@ func (s *Storage) mustRotateIndexDB() {
// These series are already registered in prevHourMetricIDs, so VM doesn't add per-day entries to the current indexdb.
// 4. Stop adding new samples for these series just before 5 UTC.
// 5. The next indexdb rotation is performed at 4 UTC next day.
// The information about the series from step 5 disappears from indexdb, since the old indexdb from step 1 is deleted,
// The information about the series from step 3 disappears from indexdb, since the old indexdb from step 1 is deleted,
// while the current indexdb doesn't contain information about the series.
// So queries for the last 24 hours stop returning samples added at step 3.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
@ -815,7 +825,6 @@ func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
name := "next_day_metric_ids"
path := filepath.Join(s.cachePath, name)
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return e
}
src, err := os.ReadFile(path)
@ -855,7 +864,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
}
path := filepath.Join(s.cachePath, name)
if !fs.IsPathExist(path) {
logger.Infof("nothing to load from %q", path)
return hm
}
src, err := os.ReadFile(path)
@ -1176,8 +1184,7 @@ func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int,
if err != nil {
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
}
// Do not reset MetricName->TSID cache in order to prevent from adding new data points
// to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs.
// Do not reset MetricName->TSID cache, since it is already reset inside DeleteTSIDs.
// Do not reset MetricID->MetricName cache, since it must be used only
// after filtering out deleted metricIDs.
@ -1531,61 +1538,129 @@ var metricRowsInsertCtxPool sync.Pool
const maxMetricRowsPerBlock = 8000
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
// RegisterMetricNames registers all the metric names from mrs in the indexdb, so they can be queried later.
//
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
// The the MetricRow.Timestamp is used for registering the metric name at the given day according to the timestamp.
// Th MetricRow.Value field is ignored.
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error {
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
qt = qt.NewChild("registering %d series", len(mrs))
defer qt.Done()
var metricName []byte
var metricNameBuf []byte
var genTSID generationTSID
mn := GetMetricName()
defer PutMetricName(mn)
var seriesRepopulated uint64
idb := s.idb()
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
var firstWarn error
for i := range mrs {
mr := &mrs[i]
date := uint64(mr.Timestamp) / msecPerDay
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if err := s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw); err != nil {
// Fast path - mr.MetricNameRaw has been already registered in the current idb.
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
// Skip row, since it exceeds cardinality limit
continue
}
if genTSID.generation == idb.generation {
// Fast path - mr.MetricNameRaw has been already registered in the current idb.
continue
if genTSID.generation != idb.generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
continue
}
mn.sortTags()
genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
}
continue
}
// Slow path - register mr.MetricNameRaw.
// Slow path - search TSID for the given metricName in indexdb.
// Construct canonical metric name - it is used below.
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
continue
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
date := uint64(mr.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil {
if errors.Is(err, errSeriesCardinalityExceeded) {
metricNameBuf = mn.Marshal(metricNameBuf[:0])
if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
// Slower path - the TSID has been found in indexdb.
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since it exceeds the configured cardinality limit.
continue
}
return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err)
if genTSID.generation != idb.generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
} else {
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
continue
}
// Slowest path - there is no TSID in indexdb for the given mr.MetricNameRaw. Create it.
generateTSID(&genTSID.TSID, mn)
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since it exceeds the configured cardinality limit.
continue
}
// Schedule creating TSID indexes instead of creating them synchronously.
// This should keep stable the ingestion rate when new time series are ingested.
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
}
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
if firstWarn != nil {
logger.Warnf("cannot create some metrics: %s", firstWarn)
}
return nil
}
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
idb := s.idb()
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
mn := GetMetricName()
defer PutMetricName(mn)
var (
// These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName.
prevTSID TSID
prevMetricNameRaw []byte
)
var pmrs *pendingMetricRows
var metricNameBuf []byte
var slowInsertsCount uint64
var newSeriesCount uint64
var seriesRepopulated uint64
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
var genTSID generationTSID
@ -1629,6 +1704,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
// Search for TSID for the given mr.MetricNameRaw and store it at r.TSID.
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
@ -1636,99 +1713,109 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil {
// Fast path - the TSID for the given mr.MetricNameRaw has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteSeries code for details.
if !s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw) {
// Skip row, since it exceeds cardinality limit
j--
continue
}
r.TSID = genTSID.TSID
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteSeries code for details.
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if genTSID.generation != idb.generation {
// The found entry is from the previous cache generation,
// so attempt to re-populate the current generation with this entry.
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
// The found TSID is from the previous indexdb. Create it in the current indexdb.
date := uint64(r.Timestamp) / msecPerDay
created, err := is.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw, date)
if err != nil {
return fmt.Errorf("cannot create indexes: %w", err)
}
if created {
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
}
continue
}
// Slow path - the TSID is missing in the cache.
// Postpone its search in the loop below.
j--
if pmrs == nil {
pmrs = getPendingMetricRows()
}
if err := pmrs.addRow(mr); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = err
}
continue
}
}
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
})
prevMetricNameRaw = nil
var slowInsertsCount uint64
for i := range pendingMetricRows {
pmr := &pendingMetricRows[i]
mr := pmr.mr
dstMrs[j] = mr
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r.TSID = prevTSID
continue
}
slowInsertsCount++
date := uint64(r.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
j--
if errors.Is(err, errSeriesCardinalityExceeded) {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
j--
continue
}
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %w", pmr.MetricName, err)
}
mn.sortTags()
genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
slowInsertsCount++
}
continue
}
// Slow path - the TSID for the given mr.MetricNameRaw is missing in the cache.
slowInsertsCount++
date := uint64(r.Timestamp) / msecPerDay
// Construct canonical metric name - it is used below.
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
j--
continue
}
mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0])
// Search for TSID for the given mr.MetricNameRaw in the indexdb.
if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
// Slower path - the TSID has been found in indexdb.
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since it exceeds the configured cardinality limit.
j--
continue
}
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
prevTSID = r.TSID
if genTSID.generation != idb.generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
} else {
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
r.TSID = genTSID.TSID
prevTSID = genTSID.TSID
prevMetricNameRaw = mr.MetricNameRaw
continue
}
// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
generateTSID(&genTSID.TSID, mn)
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since it exceeds the configured cardinality limit.
j--
continue
}
genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
newSeriesCount++
r.TSID = genTSID.TSID
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
}
putPendingMetricRows(pmrs)
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
}
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount)
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
if firstWarn != nil {
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
}
@ -1749,22 +1836,44 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error {
// SetLogNewSeries updates new series logging.
//
// This function must be called before any calling any storage functions.
func SetLogNewSeries(ok bool) {
logNewSeries = ok
}
var logNewSeries = false
func (s *Storage) createAllIndexesForMetricName(is *indexSearch, mn *MetricName, metricNameRaw []byte, genTSID *generationTSID, date uint64) error {
is.createGlobalIndexes(&genTSID.TSID, mn)
is.createPerDayIndexes(date, &genTSID.TSID, mn)
// Store the TSID for for the current indexdb into cache,
// so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(genTSID, metricNameRaw)
// Register the (date, metricID) entry in the cache,
// so next time the entry is found there instead of searching for it in the indexdb.
s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID)
return nil
}
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool {
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)
logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems())
return errSeriesCardinalityExceeded
return false
}
if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1)
logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems())
return errSeriesCardinalityExceeded
return false
}
return nil
return true
}
var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded")
func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) {
select {
case <-logSkippedSeriesTicker.C:
@ -1787,75 +1896,6 @@ func getUserReadableMetricName(metricNameRaw []byte) string {
return mn.String()
}
type pendingMetricRow struct {
MetricName []byte
mr *MetricRow
}
type pendingMetricRows struct {
pmrs []pendingMetricRow
metricNamesBuf []byte
lastMetricNameRaw []byte
lastMetricName []byte
mn MetricName
}
func (pmrs *pendingMetricRows) reset() {
mrs := pmrs.pmrs
for i := range mrs {
pmr := &mrs[i]
pmr.MetricName = nil
pmr.mr = nil
}
pmrs.pmrs = mrs[:0]
pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0]
pmrs.lastMetricNameRaw = nil
pmrs.lastMetricName = nil
pmrs.mn.Reset()
}
func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
// Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric.
if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) {
if err := pmrs.mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
pmrs.mn.sortTags()
metricNamesBufLen := len(pmrs.metricNamesBuf)
pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf)
pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:]
pmrs.lastMetricNameRaw = mr.MetricNameRaw
}
mrs := pmrs.pmrs
if cap(mrs) > len(mrs) {
mrs = mrs[:len(mrs)+1]
} else {
mrs = append(mrs, pendingMetricRow{})
}
pmrs.pmrs = mrs
pmr := &mrs[len(mrs)-1]
pmr.MetricName = pmrs.lastMetricName
pmr.mr = mr
return nil
}
func getPendingMetricRows() *pendingMetricRows {
v := pendingMetricRowsPool.Get()
if v == nil {
v = &pendingMetricRows{}
}
return v.(*pendingMetricRows)
}
func putPendingMetricRows(pmrs *pendingMetricRows) {
pmrs.reset()
pendingMetricRowsPool.Put(pmrs)
}
var pendingMetricRowsPool sync.Pool
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
var date uint64
var hour uint64
@ -1875,9 +1915,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
// pMin linearly increases from 0 to 1 during the last hour of the day.
pMin := (float64(ts%(3600*24)) / 3600) - 23
type pendingDateMetricID struct {
date uint64
metricID uint64
mr *MetricRow
date uint64
tsid *TSID
mr *MetricRow
}
var pendingDateMetricIDs []pendingDateMetricID
var pendingNextDayMetricIDs []uint64
@ -1897,7 +1937,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
prevDate = date
prevMetricID = metricID
if hour == hm.hour {
// The r belongs to the current hour. Check for the current hour cache.
// The row belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
// This means the metricID has been already added to per-day inverted index.
@ -1910,9 +1950,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
if p < pMin && !nextDayMetricIDs.Has(metricID) {
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date + 1,
metricID: metricID,
mr: mrs[i],
date: date + 1,
tsid: &r.TSID,
mr: mrs[i],
})
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
}
@ -1932,9 +1972,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
// Slow path: store the (date, metricID) entry in the indexDB.
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
metricID: metricID,
mr: mrs[i],
date: date,
tsid: &r.TSID,
mr: mrs[i],
})
}
if len(pendingNextDayMetricIDs) > 0 {
@ -1948,7 +1988,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
s.pendingHourEntriesLock.Unlock()
}
if len(pendingDateMetricIDs) == 0 {
// Fast path - there are no new (date, metricID) entries in rows.
// Fast path - there are no new (date, metricID) entries.
return nil
}
@ -1962,27 +2002,22 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
if a.date != b.date {
return a.date < b.date
}
return a.metricID < b.metricID
return a.tsid.MetricID < b.tsid.MetricID
})
idb := s.idb()
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
var firstError error
dateMetricIDsForCache := make([]dateMetricID, 0, len(pendingDateMetricIDs))
mn := GetMetricName()
for _, dmid := range pendingDateMetricIDs {
date := dmid.date
metricID := dmid.metricID
ok, err := is.hasDateMetricID(date, metricID)
if err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when locating (date=%s, metricID=%d) in database: %w", dateToString(date), metricID, err)
}
continue
}
if !ok {
metricID := dmid.tsid.MetricID
if !is.hasDateMetricIDNoExtDB(date, metricID) {
// The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes.
// It is OK if the (date, metricID) entry is added multiple times to db
// It is OK if the (date, metricID) entry is added multiple times to indexdb
// by concurrent goroutines.
if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil {
if firstError == nil {
@ -1991,7 +2026,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
continue
}
mn.sortTags()
is.createPerDayIndexes(date, metricID, mn)
is.createPerDayIndexes(date, dmid.tsid, mn)
}
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
date: date,

View file

@ -823,9 +823,7 @@ func testStorageRegisterMetricNames(s *Storage) error {
}
mrs = append(mrs, mr)
}
if err := s.RegisterMetricNames(nil, mrs); err != nil {
return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err)
}
s.RegisterMetricNames(nil, mrs)
}
var addIDsExpected []string
for k := range addIDsMap {