mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: switch from global to per-day index for MetricName -> TSID
mapping
Previously all the newly ingested time series were registered in global `MetricName -> TSID` index.
This index was used during data ingestion for locating the TSID (internal series id)
for the given canonical metric name (the canonical metric name consists of metric name plus all its labels sorted by label names).
The `MetricName -> TSID` index is stored on disk in order to make sure that the data
isn't lost on VictoriaMetrics restart or unclean shutdown.
The lookup in this index is relatively slow, since VictoriaMetrics needs to read the corresponding
data block from disk, unpack it, put the unpacked block into `indexdb/dataBlocks` cache,
and then search for the given `MetricName -> TSID` entry there. So VictoriaMetrics
uses in-memory cache for speeding up the lookup for active time series.
This cache is named `storage/tsid`. If this cache capacity is enough for all the currently ingested
active time series, then VictoriaMetrics works fast, since it doesn't need to read the data from disk.
VictoriaMetrics starts reading data from `MetricName -> TSID` on-disk index in the following cases:
- If `storage/tsid` cache capacity isn't enough for active time series.
Then just increase available memory for VictoriaMetrics or reduce the number of active time series
ingested into VictoriaMetrics.
- If new time series is ingested into VictoriaMetrics. In this case it cannot find
the needed entry in the `storage/tsid` cache, so it needs to consult on-disk `MetricName -> TSID` index,
since it doesn't know that the index has no the corresponding entry too.
This is a typical event under high churn rate, when old time series are constantly substituted
with new time series.
Reading the data from `MetricName -> TSID` index is slow, so inserts, which lead to reading this index,
are counted as slow inserts, and they can be monitored via `vm_slow_row_inserts_total` metric exposed by VictoriaMetrics.
Prior to this commit the `MetricName -> TSID` index was global, e.g. it contained entries sorted by `MetricName`
for all the time series ever ingested into VictoriaMetrics during the configured -retentionPeriod.
This index can become very large under high churn rate and long retention. VictoriaMetrics
caches data from this index in `indexdb/dataBlocks` in-memory cache for speeding up index lookups.
The `indexdb/dataBlocks` cache may occupy significant share of available memory for storing
recently accessed blocks at `MetricName -> TSID` index when searching for newly ingested time series.
This commit switches from global `MetricName -> TSID` index to per-day index. This allows significantly
reducing the amounts of data, which needs to be cached in `indexdb/dataBlocks`, since now VictoriaMetrics
consults only the index for the current day when new time series is ingested into it.
The downside of this change is increased indexdb size on disk for workloads without high churn rate,
e.g. with static time series, which do no change over time, since now VictoriaMetrics needs to store
identical `MetricName -> TSID` entries for static time series for every day.
This change removes an optimization for reducing CPU and disk IO spikes at indexdb rotation,
since it didn't work correctly - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 .
At the same time the change fixes the issue, which could result in lost access to time series,
which stop receving new samples during the first hour after indexdb rotation - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
The issue with the increased CPU and disk IO usage during indexdb rotation will be addressed
in a separate commit according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401#issuecomment-1553488685
This is a follow-up for 1f28b46ae9
This commit is contained in:
parent
1bce67df06
commit
e1cf962bad
9 changed files with 458 additions and 622 deletions
|
@ -467,12 +467,6 @@ func registerStorageMetrics(strg *storage.Storage) {
|
|||
return float64(idbm().PartsRefCount)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
|
||||
return float64(idbm().NewTimeseriesCreated)
|
||||
})
|
||||
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
|
||||
return float64(idbm().TimeseriesRepopulated)
|
||||
})
|
||||
metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 {
|
||||
return float64(idbm().MissingTSIDsForMetricID)
|
||||
})
|
||||
|
@ -587,6 +581,12 @@ func registerStorageMetrics(strg *storage.Storage) {
|
|||
return float64(m().TooSmallTimestampRows)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
|
||||
return float64(m().TimeseriesRepopulated)
|
||||
})
|
||||
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
|
||||
return float64(m().NewTimeseriesCreated)
|
||||
})
|
||||
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
|
||||
return float64(m().SlowRowInserts)
|
||||
})
|
||||
|
|
|
@ -156,7 +156,8 @@ func (api *vmstorageAPI) DeleteSeries(qt *querytracer.Tracer, sq *storage.Search
|
|||
}
|
||||
|
||||
func (api *vmstorageAPI) RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadline uint64) error {
|
||||
return api.s.RegisterMetricNames(qt, mrs)
|
||||
api.s.RegisterMetricNames(qt, mrs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *vmstorageAPI) setupTfss(qt *querytracer.Tracer, sq *storage.SearchQuery, tr storage.TimeRange, maxMetrics int, deadline uint64) ([]*storage.TagFilters, error) {
|
||||
|
|
|
@ -27,7 +27,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
|
||||
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
|
||||
|
||||
|
||||
* FETURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). The change significantly reduces the size of `indexdb/dataBlocks` cache for such setups. The cache size can be [monitored](https://docs.victoriametrics.com/#monitoring) via `vm_cache_size_bytes{type="indexdb/dataBlocks"}` metric.
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): update backoff policy on retries to reduce probability of overloading for `source` or `destination` databases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4402).
|
||||
|
|
|
@ -31,7 +31,17 @@ import (
|
|||
|
||||
const (
|
||||
// Prefix for MetricName->TSID entries.
|
||||
nsPrefixMetricNameToTSID = 0
|
||||
//
|
||||
// This index was substituted with nsPrefixDateMetricNameToTSID,
|
||||
// since the MetricName->TSID index may require big amounts of memory for indexdb/dataBlocks cache
|
||||
// when it grows big on the configured retention under high churn rate
|
||||
// (e.g. when new time series are constantly registered).
|
||||
//
|
||||
// It is much more efficient from memory usage PoV to query per-day MetricName->TSID index
|
||||
// (aka nsPrefixDateMetricNameToTSID) when the TSID must be obtained for the given MetricName
|
||||
// during data ingestion under high churn rate and big retention.
|
||||
//
|
||||
// nsPrefixMetricNameToTSID = 0
|
||||
|
||||
// Prefix for Tag->MetricID entries.
|
||||
nsPrefixTagToMetricIDs = 1
|
||||
|
@ -50,6 +60,9 @@ const (
|
|||
|
||||
// Prefix for (Date,Tag)->MetricID entries.
|
||||
nsPrefixDateTagToMetricIDs = 6
|
||||
|
||||
// Prefix for (Date,MetricName)->TSID entries.
|
||||
nsPrefixDateMetricNameToTSID = 7
|
||||
)
|
||||
|
||||
// indexDB represents an index db.
|
||||
|
@ -59,12 +72,6 @@ type indexDB struct {
|
|||
|
||||
refCount uint64
|
||||
|
||||
// The counter for newly created time series. It can be used for determining time series churn rate.
|
||||
newTimeseriesCreated uint64
|
||||
|
||||
// The counter for time series which were re-populated from previous indexDB after the rotation.
|
||||
timeseriesRepopulated uint64
|
||||
|
||||
// The number of missing MetricID -> TSID entries.
|
||||
// High rate for this value means corrupted indexDB.
|
||||
missingTSIDsForMetricID uint64
|
||||
|
@ -177,8 +184,6 @@ type IndexDBMetrics struct {
|
|||
|
||||
IndexDBRefCount uint64
|
||||
|
||||
NewTimeseriesCreated uint64
|
||||
TimeseriesRepopulated uint64
|
||||
MissingTSIDsForMetricID uint64
|
||||
|
||||
RecentHourMetricIDsSearchCalls uint64
|
||||
|
@ -219,8 +224,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
|
|||
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
|
||||
|
||||
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
|
||||
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
|
||||
m.TimeseriesRepopulated += atomic.LoadUint64(&db.timeseriesRepopulated)
|
||||
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
|
||||
|
||||
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
|
||||
|
@ -385,33 +388,6 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
|
|||
db.s.metricNameCache.Set(key[:], metricName)
|
||||
}
|
||||
|
||||
// maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db.
|
||||
//
|
||||
// The probability increases from 0 to 100% during the first hour since db rotation.
|
||||
//
|
||||
// It returns true if new index entry was created, and false if it was skipped.
|
||||
func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) {
|
||||
pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600
|
||||
if pMin < 1 {
|
||||
p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
|
||||
if p > pMin {
|
||||
// Fast path: there is no need creating indexes for metricNameRaw yet.
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
// Slow path: create indexes for (tsid, metricNameRaw) at db.
|
||||
mn := GetMetricName()
|
||||
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
|
||||
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
|
||||
}
|
||||
mn.sortTags()
|
||||
is.createGlobalIndexes(tsid, mn)
|
||||
is.createPerDayIndexes(date, tsid.MetricID, mn)
|
||||
PutMetricName(mn)
|
||||
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
|
||||
prefix := ^uint64(0)
|
||||
if versioned {
|
||||
|
@ -486,25 +462,28 @@ func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
|
|||
return dst, nil
|
||||
}
|
||||
|
||||
// getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
|
||||
// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date.
|
||||
//
|
||||
// It returns io.EOF if the given mn isn't found locally.
|
||||
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
err := is.getTSIDByMetricName(dst, metricName)
|
||||
db.putIndexSearch(is)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err != io.EOF {
|
||||
return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)
|
||||
// It returns false if the given metricName isn't found in the indexdb.
|
||||
func (is *indexSearch) getTSIDByMetricName(dst *generationTSID, metricName []byte, date uint64) bool {
|
||||
if is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date) {
|
||||
// Fast path - the TSID is found in the current indexdb.
|
||||
dst.generation = is.db.generation
|
||||
return true
|
||||
}
|
||||
|
||||
// Do not search for the TSID in the external storage,
|
||||
// since this function is already called by another indexDB instance.
|
||||
|
||||
// The TSID for the given mn wasn't found.
|
||||
return io.EOF
|
||||
// Slow path - search for the TSID in the previous indexdb
|
||||
ok := false
|
||||
deadline := is.deadline
|
||||
is.db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(0, 0, deadline)
|
||||
ok = is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date)
|
||||
extDB.putIndexSearch(is)
|
||||
if ok {
|
||||
dst.generation = extDB.generation
|
||||
}
|
||||
})
|
||||
return ok
|
||||
}
|
||||
|
||||
type indexSearch struct {
|
||||
|
@ -518,55 +497,6 @@ type indexSearch struct {
|
|||
|
||||
// deadline in unix timestamp seconds for the given search.
|
||||
deadline uint64
|
||||
|
||||
// tsidByNameMisses and tsidByNameSkips is used for a performance
|
||||
// hack in GetOrCreateTSIDByName. See the comment there.
|
||||
tsidByNameMisses int
|
||||
tsidByNameSkips int
|
||||
}
|
||||
|
||||
// GetOrCreateTSIDByName fills the dst with TSID for the given metricName.
|
||||
//
|
||||
// It also registers the metricName in global and per-day indexes
|
||||
// for the given date if the metricName->TSID entry is missing in the index.
|
||||
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
|
||||
// A hack: skip searching for the TSID after many serial misses.
|
||||
// This should improve insertion performance for big batches
|
||||
// of new time series.
|
||||
if is.tsidByNameMisses < 100 {
|
||||
err := is.getTSIDByMetricName(dst, metricName)
|
||||
if err == nil {
|
||||
// Fast path - the TSID for the given metricName has been found in the index.
|
||||
is.tsidByNameMisses = 0
|
||||
if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
|
||||
return err
|
||||
}
|
||||
// There is no need in checking whether the TSID is present in the per-day index for the given date,
|
||||
// since this check must be performed by the caller in an optimized way.
|
||||
// See storage.updatePerDateData() function.
|
||||
return nil
|
||||
}
|
||||
if err != io.EOF {
|
||||
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
|
||||
return fmt.Errorf("cannot search TSID by MetricName %s: %w", userReadableMetricName, err)
|
||||
}
|
||||
is.tsidByNameMisses++
|
||||
} else {
|
||||
is.tsidByNameSkips++
|
||||
if is.tsidByNameSkips > 10000 {
|
||||
is.tsidByNameSkips = 0
|
||||
is.tsidByNameMisses = 0
|
||||
}
|
||||
}
|
||||
|
||||
// TSID for the given name wasn't found. Create it.
|
||||
// It is OK if duplicate TSID for mn is created by concurrent goroutines.
|
||||
// Metric results will be merged by mn after TableSearch.
|
||||
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
|
||||
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
|
||||
return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *indexDB) getIndexSearch(accountID, projectID uint32, deadline uint64) *indexSearch {
|
||||
|
@ -592,75 +522,9 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
|
|||
is.projectID = 0
|
||||
is.deadline = 0
|
||||
|
||||
// Do not reset tsidByNameMisses and tsidByNameSkips,
|
||||
// since they are used in GetOrCreateTSIDByName across call boundaries.
|
||||
|
||||
db.indexSearchPool.Put(is)
|
||||
}
|
||||
|
||||
func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
if err := mn.Unmarshal(metricName); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
|
||||
}
|
||||
|
||||
created, err := is.db.getOrCreateTSID(dst, metricName, mn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot generate TSID: %w", err)
|
||||
}
|
||||
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
|
||||
return err
|
||||
}
|
||||
is.createGlobalIndexes(dst, mn)
|
||||
is.createPerDayIndexes(date, dst.MetricID, mn)
|
||||
|
||||
// There is no need in invalidating tag cache, since it is invalidated
|
||||
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable.
|
||||
|
||||
if created {
|
||||
// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
|
||||
atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
|
||||
if logNewSeries {
|
||||
logger.Infof("new series created: %s", mn.String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetLogNewSeries updates new series logging.
|
||||
//
|
||||
// This function must be called before any calling any storage functions.
|
||||
func SetLogNewSeries(ok bool) {
|
||||
logNewSeries = ok
|
||||
}
|
||||
|
||||
var logNewSeries = false
|
||||
|
||||
// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found.
|
||||
//
|
||||
// Returns true if TSID was created or false if TSID was in extDB
|
||||
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {
|
||||
// Search the TSID in the external storage.
|
||||
// This is usually the db from the previous period.
|
||||
var err error
|
||||
if db.doExtDB(func(extDB *indexDB) {
|
||||
err = extDB.getTSIDByNameNoCreate(dst, metricName)
|
||||
}) {
|
||||
if err == nil {
|
||||
// The TSID has been found in the external storage.
|
||||
return false, nil
|
||||
}
|
||||
if err != io.EOF {
|
||||
return false, fmt.Errorf("external search failed: %w", err)
|
||||
}
|
||||
}
|
||||
// The TSID wasn't found in the external storage.
|
||||
// Generate it locally.
|
||||
generateTSID(dst, mn)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func generateTSID(dst *TSID, mn *MetricName) {
|
||||
dst.AccountID = mn.AccountID
|
||||
dst.ProjectID = mn.ProjectID
|
||||
|
@ -686,13 +550,6 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
|
|||
ii := getIndexItems()
|
||||
defer putIndexItems(ii)
|
||||
|
||||
// Create MetricName -> TSID index.
|
||||
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
|
||||
ii.B = mn.Marshal(ii.B)
|
||||
ii.B = append(ii.B, kvSeparatorChar)
|
||||
ii.B = tsid.Marshal(ii.B)
|
||||
ii.Next()
|
||||
|
||||
// Create MetricID -> MetricName index.
|
||||
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
|
||||
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
|
||||
|
@ -2089,26 +1946,28 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, accountID, proj
|
|||
|
||||
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
|
||||
|
||||
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
|
||||
func (is *indexSearch) getTSIDByMetricNameNoExtDB(dst *TSID, metricName []byte, date uint64) bool {
|
||||
dmis := is.db.s.getDeletedMetricIDs()
|
||||
ts := &is.ts
|
||||
kb := &is.kb
|
||||
kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID)
|
||||
// Do not use marshalCommonPrefix() here, since mn already contains (AccountID, ProjectID)
|
||||
kb.B = append(kb.B, nsPrefixDateMetricNameToTSID)
|
||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||
kb.B = append(kb.B, metricName...)
|
||||
kb.B = append(kb.B, kvSeparatorChar)
|
||||
ts.Seek(kb.B)
|
||||
for ts.NextItem() {
|
||||
if !bytes.HasPrefix(ts.Item, kb.B) {
|
||||
// Nothing found.
|
||||
return io.EOF
|
||||
return false
|
||||
}
|
||||
v := ts.Item[len(kb.B):]
|
||||
tail, err := dst.Unmarshal(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot unmarshal TSID: %w", err)
|
||||
logger.Panicf("FATAL: cannot unmarshal TSID: %s", err)
|
||||
}
|
||||
if len(tail) > 0 {
|
||||
return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
|
||||
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling TSID: %X", tail)
|
||||
}
|
||||
if dmis.Len() > 0 {
|
||||
// Verify whether the dst is marked as deleted.
|
||||
|
@ -2118,13 +1977,13 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
|
|||
}
|
||||
}
|
||||
// Found valid dst.
|
||||
return nil
|
||||
return true
|
||||
}
|
||||
if err := ts.Error(); err != nil {
|
||||
return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err)
|
||||
logger.Panicf("FATAL: error when searching TSID by metricName; searchPrefix %q: %s", kb.B, err)
|
||||
}
|
||||
// Nothing found
|
||||
return io.EOF
|
||||
return false
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
|
||||
|
@ -2975,13 +2834,23 @@ const (
|
|||
int64Max = int64((1 << 63) - 1)
|
||||
)
|
||||
|
||||
func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) {
|
||||
func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricName) {
|
||||
ii := getIndexItems()
|
||||
defer putIndexItems(ii)
|
||||
|
||||
ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID, mn.AccountID, mn.ProjectID)
|
||||
ii.B = encoding.MarshalUint64(ii.B, date)
|
||||
ii.B = encoding.MarshalUint64(ii.B, metricID)
|
||||
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
|
||||
ii.Next()
|
||||
|
||||
// Create per-day inverted index entries for TSID.
|
||||
//
|
||||
// Do not use marshalCommonPrefix() here, since mn already contains (AccountID, ProjectID)
|
||||
ii.B = append(ii.B, nsPrefixDateMetricNameToTSID)
|
||||
ii.B = encoding.MarshalUint64(ii.B, date)
|
||||
ii.B = mn.Marshal(ii.B)
|
||||
ii.B = append(ii.B, kvSeparatorChar)
|
||||
ii.B = tsid.Marshal(ii.B)
|
||||
ii.Next()
|
||||
|
||||
// Create per-day inverted index entries for metricID.
|
||||
|
@ -2989,9 +2858,8 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName
|
|||
defer kbPool.Put(kb)
|
||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, mn.AccountID, mn.ProjectID)
|
||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||
ii.registerTagIndexes(kb.B, mn, metricID)
|
||||
ii.registerTagIndexes(kb.B, mn, tsid.MetricID)
|
||||
is.db.tb.AddItems(ii.Items)
|
||||
is.db.s.dateMetricIDCache.Set(date, metricID)
|
||||
}
|
||||
|
||||
func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {
|
||||
|
@ -3101,22 +2969,24 @@ func reverseBytes(dst, src []byte) []byte {
|
|||
return dst
|
||||
}
|
||||
|
||||
func (is *indexSearch) hasDateMetricID(date, metricID uint64, accountID, projectID uint32) (bool, error) {
|
||||
func (is *indexSearch) hasDateMetricIDNoExtDB(date, metricID uint64, accountID, projectID uint32) bool {
|
||||
ts := &is.ts
|
||||
kb := &is.kb
|
||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID, accountID, projectID)
|
||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||
kb.B = encoding.MarshalUint64(kb.B, metricID)
|
||||
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
||||
if err == io.EOF {
|
||||
return false, nil
|
||||
err := ts.FirstItemWithPrefix(kb.B)
|
||||
if err == nil {
|
||||
if string(ts.Item) != string(kb.B) {
|
||||
logger.Panicf("FATAL: unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
|
||||
}
|
||||
return false, fmt.Errorf("error when searching for (date=%s, metricID=%d) entry: %w", dateToString(date), metricID, err)
|
||||
// Fast path - the (date, metricID) entry is found in the current indexdb.
|
||||
return true
|
||||
}
|
||||
if string(ts.Item) != string(kb.B) {
|
||||
return false, fmt.Errorf("unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
|
||||
if err != io.EOF {
|
||||
logger.Panicf("FATAL: unexpected error when searching for (date=%s, metricID=%d) entry: %s", dateToString(date), metricID, err)
|
||||
}
|
||||
return true, nil
|
||||
return false
|
||||
}
|
||||
|
||||
func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,
|
||||
|
|
|
@ -507,12 +507,11 @@ func TestRemoveDuplicateMetricIDs(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIndexDBOpenClose(t *testing.T) {
|
||||
s := newTestStorage()
|
||||
defer stopTestStorage(s)
|
||||
var s Storage
|
||||
tableName := nextIndexDBTableName()
|
||||
for i := 0; i < 5; i++ {
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(tableName, s, 0, &isReadOnly)
|
||||
db := mustOpenIndexDB(tableName, &s, 0, &isReadOnly)
|
||||
db.MustClose()
|
||||
}
|
||||
if err := os.RemoveAll(tableName); err != nil {
|
||||
|
@ -526,19 +525,10 @@ func TestIndexDB(t *testing.T) {
|
|||
const metricGroups = 10
|
||||
|
||||
t.Run("serial", func(t *testing.T) {
|
||||
s := newTestStorage()
|
||||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
t.Fatalf("cannot remove indexDB: %s", err)
|
||||
}
|
||||
}()
|
||||
const path = "TestIndexDB-serial"
|
||||
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
|
||||
|
||||
db := s.idb()
|
||||
mns, tsids, tenants, err := testIndexDBGetOrCreateTSIDByName(db, accountsCount, projectsCount, metricGroups)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
|
@ -547,27 +537,23 @@ func TestIndexDB(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
// Re-open the db and verify it works as expected.
|
||||
db.MustClose()
|
||||
db = mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
// Re-open the storage and verify it works as expected.
|
||||
s.MustClose()
|
||||
s = MustOpenStorage(path, maxRetentionMsecs, 0, 0)
|
||||
|
||||
db = s.idb()
|
||||
if err := testIndexDBCheckTSIDByName(db, mns, tsids, tenants, false); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
})
|
||||
|
||||
t.Run("concurrent", func(t *testing.T) {
|
||||
s := newTestStorage()
|
||||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
t.Fatalf("cannot remove indexDB: %s", err)
|
||||
}
|
||||
}()
|
||||
const path = "TestIndexDB-concurrent"
|
||||
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
|
||||
db := s.idb()
|
||||
|
||||
ch := make(chan error, 3)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
|
@ -584,20 +570,20 @@ func TestIndexDB(t *testing.T) {
|
|||
ch <- nil
|
||||
}()
|
||||
}
|
||||
var errors []error
|
||||
deadlineCh := time.After(30 * time.Second)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
errors = append(errors, fmt.Errorf("unexpected error: %w", err))
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-deadlineCh:
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
if len(errors) > 0 {
|
||||
t.Fatal(errors[0])
|
||||
}
|
||||
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -610,11 +596,12 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
|
|||
tenants := make(map[string]struct{})
|
||||
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
|
||||
date := uint64(timestampFromTime(time.Now())) / msecPerDay
|
||||
|
||||
var metricNameBuf []byte
|
||||
var metricNameRawBuf []byte
|
||||
for i := 0; i < 4e2+1; i++ {
|
||||
for i := 0; i < 401; i++ {
|
||||
var mn MetricName
|
||||
mn.AccountID = uint32((i + 2) % accountsCount)
|
||||
mn.ProjectID = uint32((i + 1) % projectsCount)
|
||||
|
@ -636,30 +623,26 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
|
|||
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
||||
|
||||
// Create tsid for the metricName.
|
||||
var tsid TSID
|
||||
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
|
||||
var genTSID generationTSID
|
||||
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
|
||||
}
|
||||
if tsid.AccountID != mn.AccountID {
|
||||
return nil, nil, nil, fmt.Errorf("unexpected TSID.AccountID; got %d; want %d; mn:\n%s\ntsid:\n%+v", tsid.AccountID, mn.AccountID, &mn, &tsid)
|
||||
if genTSID.TSID.AccountID != mn.AccountID {
|
||||
return nil, nil, nil, fmt.Errorf("unexpected TSID.AccountID; got %d; want %d; mn:\n%s\ntsid:\n%+v", genTSID.TSID.AccountID, mn.AccountID, &mn, &genTSID.TSID)
|
||||
}
|
||||
if tsid.ProjectID != mn.ProjectID {
|
||||
return nil, nil, nil, fmt.Errorf("unexpected TSID.ProjectID; got %d; want %d; mn:\n%s\ntsid:\n%+v", tsid.ProjectID, mn.ProjectID, &mn, &tsid)
|
||||
if genTSID.TSID.ProjectID != mn.ProjectID {
|
||||
return nil, nil, nil, fmt.Errorf("unexpected TSID.ProjectID; got %d; want %d; mn:\n%s\ntsid:\n%+v", genTSID.TSID.ProjectID, mn.ProjectID, &mn, &genTSID.TSID)
|
||||
}
|
||||
|
||||
mns = append(mns, mn)
|
||||
tsids = append(tsids, tsid)
|
||||
}
|
||||
|
||||
// fill Date -> MetricID cache
|
||||
date := uint64(timestampFromTime(time.Now())) / msecPerDay
|
||||
for i := range tsids {
|
||||
tsid := &tsids[i]
|
||||
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
|
||||
tsids = append(tsids, genTSID.TSID)
|
||||
}
|
||||
db.putIndexSearch(is)
|
||||
|
||||
// Flush index to disk, so it becomes visible for search
|
||||
db.tb.DebugFlush()
|
||||
db.s.DebugFlush()
|
||||
|
||||
var tenantsList []string
|
||||
for tenant := range tenants {
|
||||
|
@ -679,9 +662,10 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten
|
|||
return false
|
||||
}
|
||||
|
||||
currentTime := timestampFromTime(time.Now())
|
||||
allLabelNames := make(map[accountProjectKey]map[string]bool)
|
||||
timeseriesCounters := make(map[accountProjectKey]map[uint64]bool)
|
||||
var tsidCopy TSID
|
||||
var genTSID generationTSID
|
||||
var metricNameCopy []byte
|
||||
for i := range mns {
|
||||
mn := &mns[i]
|
||||
|
@ -701,26 +685,29 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten
|
|||
mn.sortTags()
|
||||
metricName := mn.Marshal(nil)
|
||||
|
||||
if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName); err != nil {
|
||||
return fmt.Errorf("cannot obtain tsid #%d for mn %s: %w", i, mn, err)
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
if !is.getTSIDByMetricName(&genTSID, metricName, uint64(currentTime)/msecPerDay) {
|
||||
return fmt.Errorf("cannot obtain tsid #%d for mn %s", i, mn)
|
||||
}
|
||||
db.putIndexSearch(is)
|
||||
|
||||
if isConcurrent {
|
||||
// Copy tsid.MetricID, since multiple TSIDs may match
|
||||
// the same mn in concurrent mode.
|
||||
tsidCopy.MetricID = tsid.MetricID
|
||||
genTSID.TSID.MetricID = tsid.MetricID
|
||||
}
|
||||
if !reflect.DeepEqual(tsid, &tsidCopy) {
|
||||
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &tsidCopy, tsid)
|
||||
if !reflect.DeepEqual(tsid, &genTSID.TSID) {
|
||||
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &genTSID.TSID, tsid)
|
||||
}
|
||||
|
||||
// Search for metric name for the given metricID.
|
||||
var err error
|
||||
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], tsidCopy.MetricID, tsidCopy.AccountID, tsidCopy.ProjectID)
|
||||
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], genTSID.TSID.MetricID, genTSID.TSID.AccountID, genTSID.TSID.ProjectID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err)
|
||||
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", genTSID.TSID.MetricID, i, err)
|
||||
}
|
||||
if !bytes.Equal(metricName, metricNameCopy) {
|
||||
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName)
|
||||
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", genTSID.TSID.MetricID, metricNameCopy, metricName)
|
||||
}
|
||||
|
||||
// Try searching metric name for non-existent MetricID.
|
||||
|
@ -785,7 +772,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten
|
|||
}
|
||||
|
||||
// Test SearchTenants on specific time range
|
||||
currentTime := timestampFromTime(time.Now())
|
||||
tr := TimeRange{
|
||||
MinTimestamp: currentTime - msecPerDay,
|
||||
MaxTimestamp: currentTime + msecPerDay,
|
||||
|
@ -815,10 +801,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, ten
|
|||
}
|
||||
|
||||
// Try tag filters.
|
||||
tr = TimeRange{
|
||||
MinTimestamp: currentTime - msecPerDay,
|
||||
MaxTimestamp: currentTime + msecPerDay,
|
||||
}
|
||||
for i := range mns {
|
||||
mn := &mns[i]
|
||||
tsid := &tsids[i]
|
||||
|
@ -1557,12 +1539,6 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
|||
r := rand.New(rand.NewSource(1))
|
||||
path := "TestIndexRepopulateAfterRotation"
|
||||
s := MustOpenStorage(path, msecsPerMonth, 1e5, 1e5)
|
||||
defer func() {
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}()
|
||||
|
||||
db := s.idb()
|
||||
if db.generation == 0 {
|
||||
|
@ -1570,9 +1546,11 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
|||
}
|
||||
|
||||
const metricRowsN = 1000
|
||||
// use min-max timestamps of 1month range to create smaller number of partitions
|
||||
timeMin, timeMax := time.Now().Add(-730*time.Hour), time.Now()
|
||||
mrs := testGenerateMetricRows(r, metricRowsN, timeMin.UnixMilli(), timeMax.UnixMilli())
|
||||
|
||||
currentDayTimestamp := (time.Now().UnixMilli() / msecPerDay) * msecPerDay
|
||||
timeMin := currentDayTimestamp - 24*3600*1000
|
||||
timeMax := currentDayTimestamp + 24*3600*1000
|
||||
mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
|
||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
||||
t.Fatalf("unexpected error when adding mrs: %s", err)
|
||||
}
|
||||
|
@ -1586,7 +1564,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
|||
}
|
||||
|
||||
// check new series were registered in indexDB
|
||||
added := atomic.LoadUint64(&db.newTimeseriesCreated)
|
||||
added := atomic.LoadUint64(&db.s.newTimeseriesCreated)
|
||||
if added != metricRowsN {
|
||||
t.Fatalf("expected indexDB to contain %d rows; got %d", metricRowsN, added)
|
||||
}
|
||||
|
@ -1626,50 +1604,34 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
|||
t.Fatalf("expected new indexDB generation %d to be different from prev indexDB", dbNew.generation)
|
||||
}
|
||||
|
||||
// Re-insert rows again and verify that entries belong prevGeneration and dbNew.generation,
|
||||
// while the majority of entries remain at prevGeneration.
|
||||
// Re-insert rows again and verify that all the entries belong to new generation
|
||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
||||
t.Fatalf("unexpected error when adding mrs: %s", err)
|
||||
}
|
||||
s.DebugFlush()
|
||||
entriesByGeneration := make(map[uint64]int)
|
||||
|
||||
for _, mr := range mrs {
|
||||
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
|
||||
entriesByGeneration[genTSID.generation]++
|
||||
if genTSID.generation != dbNew.generation {
|
||||
t.Fatalf("unexpected generation for data after rotation; got %d; want %d", genTSID.generation, dbNew.generation)
|
||||
}
|
||||
}
|
||||
if len(entriesByGeneration) > 2 {
|
||||
t.Fatalf("expecting two generations; got %d", entriesByGeneration)
|
||||
}
|
||||
prevEntries := entriesByGeneration[prevGeneration]
|
||||
currEntries := entriesByGeneration[dbNew.generation]
|
||||
totalEntries := prevEntries + currEntries
|
||||
if totalEntries != metricRowsN {
|
||||
t.Fatalf("unexpected number of entries in tsid cache; got %d; want %d", totalEntries, metricRowsN)
|
||||
}
|
||||
if float64(currEntries)/float64(totalEntries) > 0.1 {
|
||||
t.Fatalf("too big share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries)
|
||||
|
||||
s.MustClose()
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
t.Fatalf("cannot remove %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
s := newTestStorage()
|
||||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
t.Fatalf("cannot remove indexDB: %s", err)
|
||||
}
|
||||
}()
|
||||
const path = "TestSearchTSIDWithTimeRange"
|
||||
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
|
||||
db := s.idb()
|
||||
|
||||
// Create a bunch of per-day time series
|
||||
const accountID = 12345
|
||||
const projectID = 85453
|
||||
is := db.getIndexSearch(accountID, projectID, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
const days = 5
|
||||
const metricsPerDay = 1000
|
||||
theDay := time.Date(2019, time.October, 15, 5, 1, 0, 0, time.UTC)
|
||||
|
@ -1687,8 +1649,8 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
}
|
||||
sort.Strings(labelNames)
|
||||
for day := 0; day < days; day++ {
|
||||
var tsids []TSID
|
||||
var mns []MetricName
|
||||
date := baseDate - uint64(day)
|
||||
var metricIDs uint64set.Set
|
||||
for metric := 0; metric < metricsPerDay; metric++ {
|
||||
var mn MetricName
|
||||
mn.AccountID = accountID
|
||||
|
@ -1714,37 +1676,30 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
|
||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
||||
var tsid TSID
|
||||
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
|
||||
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
|
||||
var genTSID generationTSID
|
||||
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
|
||||
}
|
||||
if tsid.AccountID != accountID {
|
||||
t.Fatalf("unexpected accountID; got %d; want %d", tsid.AccountID, accountID)
|
||||
if genTSID.TSID.AccountID != accountID {
|
||||
t.Fatalf("unexpected accountID; got %d; want %d", genTSID.TSID.AccountID, accountID)
|
||||
}
|
||||
if tsid.ProjectID != projectID {
|
||||
t.Fatalf("unexpected accountID; got %d; want %d", tsid.ProjectID, projectID)
|
||||
if genTSID.TSID.ProjectID != projectID {
|
||||
t.Fatalf("unexpected accountID; got %d; want %d", genTSID.TSID.ProjectID, projectID)
|
||||
}
|
||||
mns = append(mns, mn)
|
||||
tsids = append(tsids, tsid)
|
||||
metricIDs.Add(genTSID.TSID.MetricID)
|
||||
}
|
||||
|
||||
// Add the metrics to the per-day stores
|
||||
date := baseDate - uint64(day)
|
||||
var metricIDs uint64set.Set
|
||||
for i := range tsids {
|
||||
tsid := &tsids[i]
|
||||
metricIDs.Add(tsid.MetricID)
|
||||
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
|
||||
}
|
||||
allMetricIDs.Union(&metricIDs)
|
||||
perDayMetricIDs[date] = &metricIDs
|
||||
}
|
||||
db.putIndexSearch(is)
|
||||
|
||||
// Flush index to disk, so it becomes visible for search
|
||||
db.tb.DebugFlush()
|
||||
s.DebugFlush()
|
||||
|
||||
is2 := db.getIndexSearch(accountID, projectID, noDeadline)
|
||||
defer db.putIndexSearch(is2)
|
||||
|
||||
// Check that all the metrics are found for all the days.
|
||||
for date := baseDate - days + 1; date <= baseDate; date++ {
|
||||
|
@ -1765,6 +1720,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
if !allMetricIDs.Equal(metricIDs) {
|
||||
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
|
||||
}
|
||||
db.putIndexSearch(is2)
|
||||
|
||||
// Check SearchLabelNamesWithFiltersOnTimeRange with the specified time range.
|
||||
tr := TimeRange{
|
||||
|
@ -2110,6 +2066,9 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
|||
if status.TotalLabelValuePairs != expectedLabelValuePairs {
|
||||
t.Fatalf("unexpected TotalLabelValuePairs; got %d; want %d", status.TotalLabelValuePairs, expectedLabelValuePairs)
|
||||
}
|
||||
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
}
|
||||
|
||||
func toTFPointers(tfs []tagFilter) []*tagFilter {
|
||||
|
@ -2131,8 +2090,6 @@ func newTestStorage() *Storage {
|
|||
retentionMsecs: maxRetentionMsecs,
|
||||
}
|
||||
s.setDeletedMetricIDs(&uint64set.Set{})
|
||||
var idb *indexDB
|
||||
s.idbCurr.Store(idb)
|
||||
return s
|
||||
}
|
||||
|
||||
|
|
|
@ -2,12 +2,13 @@ package storage
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
)
|
||||
|
||||
func BenchmarkRegexpFilterMatch(b *testing.B) {
|
||||
|
@ -39,21 +40,12 @@ func BenchmarkRegexpFilterMismatch(b *testing.B) {
|
|||
}
|
||||
|
||||
func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
||||
const path = "BenchmarkIndexDBAddTSIDs"
|
||||
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
|
||||
db := s.idb()
|
||||
|
||||
const recordsPerLoop = 1e3
|
||||
|
||||
s := newTestStorage()
|
||||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
b.Fatalf("cannot remove indexDB: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
var goroutineID uint32
|
||||
|
||||
b.ReportAllocs()
|
||||
|
@ -61,7 +53,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var mn MetricName
|
||||
var tsid TSID
|
||||
var genTSID generationTSID
|
||||
mn.AccountID = atomic.AddUint32(&goroutineID, 1)
|
||||
|
||||
// The most common tags.
|
||||
|
@ -76,15 +68,18 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
|||
|
||||
startOffset := 0
|
||||
for pb.Next() {
|
||||
benchmarkIndexDBAddTSIDs(db, &tsid, &mn, startOffset, recordsPerLoop)
|
||||
benchmarkIndexDBAddTSIDs(db, &genTSID, &mn, startOffset, recordsPerLoop)
|
||||
startOffset += recordsPerLoop
|
||||
}
|
||||
})
|
||||
b.StopTimer()
|
||||
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
}
|
||||
|
||||
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
|
||||
var metricName []byte
|
||||
func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) {
|
||||
date := uint64(0)
|
||||
var metricNameRaw []byte
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
|
@ -94,39 +89,30 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
|
|||
mn.Tags[j].Value = strconv.AppendUint(mn.Tags[j].Value[:0], uint64(i*j), 16)
|
||||
}
|
||||
mn.sortTags()
|
||||
metricName = mn.Marshal(metricName[:0])
|
||||
|
||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||
if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil {
|
||||
panic(fmt.Errorf("cannot insert record: %w", err))
|
||||
}
|
||||
generateTSID(&genTSID.TSID, mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, mn, metricNameRaw, genTSID, date)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||
// This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52
|
||||
// See https://www.robustperception.io/evaluating-performance-and-correctness for more details.
|
||||
s := newTestStorage()
|
||||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
b.Fatalf("cannot remove indexDB: %s", err)
|
||||
}
|
||||
}()
|
||||
const path = "BenchmarkHeadPostingForMatchers"
|
||||
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
|
||||
db := s.idb()
|
||||
|
||||
// Fill the db with data as in https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L66
|
||||
const accountID = 34327843
|
||||
const projectID = 893433
|
||||
var mn MetricName
|
||||
var metricName []byte
|
||||
var metricNameRaw []byte
|
||||
var tsid TSID
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
var mn MetricName
|
||||
var metricNameRaw []byte
|
||||
var genTSID generationTSID
|
||||
date := uint64(0)
|
||||
addSeries := func(kvs ...string) {
|
||||
mn.Reset()
|
||||
for i := 0; i < len(kvs); i += 2 {
|
||||
|
@ -135,11 +121,10 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||
mn.sortTags()
|
||||
mn.AccountID = accountID
|
||||
mn.ProjectID = projectID
|
||||
metricName = mn.Marshal(metricName[:0])
|
||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||
if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
|
||||
b.Fatalf("cannot insert record: %s", err)
|
||||
}
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
|
||||
}
|
||||
for n := 0; n < 10; n++ {
|
||||
ns := strconv.Itoa(n)
|
||||
|
@ -155,12 +140,11 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||
}
|
||||
|
||||
// Make sure all the items can be searched.
|
||||
db.tb.DebugFlush()
|
||||
db.s.DebugFlush()
|
||||
b.ResetTimer()
|
||||
|
||||
benchSearch := func(b *testing.B, tfs *TagFilters, expectedMetricIDs int) {
|
||||
is := db.getIndexSearch(tfs.accountID, tfs.projectID, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
tfss := []*TagFilters{tfs}
|
||||
tr := TimeRange{
|
||||
MinTimestamp: 0,
|
||||
|
@ -175,6 +159,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||
b.Fatalf("unexpected metricIDs found; got %d; want %d", len(metricIDs), expectedMetricIDs)
|
||||
}
|
||||
}
|
||||
db.putIndexSearch(is)
|
||||
}
|
||||
addTagFilter := func(tfs *TagFilters, key, value string, isNegative, isRegexp bool) {
|
||||
if err := tfs.Add([]byte(key), []byte(value), isNegative, isRegexp); err != nil {
|
||||
|
@ -283,21 +268,15 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
|||
addTagFilter(tfs, "j", "foo", false, false)
|
||||
benchSearch(b, tfs, 88889)
|
||||
})
|
||||
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
}
|
||||
|
||||
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||
s := newTestStorage()
|
||||
defer stopTestStorage(s)
|
||||
|
||||
dbName := nextIndexDBTableName()
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(dbName, s, 0, &isReadOnly)
|
||||
defer func() {
|
||||
db.MustClose()
|
||||
if err := os.RemoveAll(dbName); err != nil {
|
||||
b.Fatalf("cannot remove indexDB: %s", err)
|
||||
}
|
||||
}()
|
||||
const path = "BenchmarkIndexDBGetTSIDs"
|
||||
s := MustOpenStorage(path, maxRetentionMsecs, 0, 0)
|
||||
db := s.idb()
|
||||
|
||||
const recordsPerLoop = 1000
|
||||
const accountsCount = 111
|
||||
|
@ -312,33 +291,33 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
|||
value := fmt.Sprintf("value_%d", i)
|
||||
mn.AddTag(key, value)
|
||||
}
|
||||
var tsid TSID
|
||||
var metricName []byte
|
||||
var genTSID generationTSID
|
||||
var metricNameRaw []byte
|
||||
date := uint64(0)
|
||||
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
|
||||
for i := 0; i < recordsCount; i++ {
|
||||
mn.AccountID = uint32(i % accountsCount)
|
||||
mn.ProjectID = uint32(i % projectsCount)
|
||||
mn.sortTags()
|
||||
metricName = mn.Marshal(metricName[:0])
|
||||
metricNameRaw = mn.marshalRaw(metricName[:0])
|
||||
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
|
||||
b.Fatalf("cannot insert record: %s", err)
|
||||
}
|
||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||
generateTSID(&genTSID.TSID, &mn)
|
||||
genTSID.generation = db.generation
|
||||
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
|
||||
}
|
||||
db.s.DebugFlush()
|
||||
|
||||
b.SetBytes(recordsPerLoop)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
var tsidLocal TSID
|
||||
var genTSIDLocal generationTSID
|
||||
var metricNameLocal []byte
|
||||
var metricNameLocalRaw []byte
|
||||
mnLocal := mn
|
||||
is := db.getIndexSearch(0, 0, noDeadline)
|
||||
defer db.putIndexSearch(is)
|
||||
for pb.Next() {
|
||||
for i := 0; i < recordsPerLoop; i++ {
|
||||
mnLocal.AccountID = uint32(i % accountsCount)
|
||||
|
@ -346,11 +325,15 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
|||
mnLocal.sortTags()
|
||||
metricNameLocal = mnLocal.Marshal(metricNameLocal[:0])
|
||||
metricNameLocalRaw = mnLocal.marshalRaw(metricNameLocalRaw[:0])
|
||||
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, metricNameLocalRaw, 0); err != nil {
|
||||
panic(fmt.Errorf("cannot obtain tsid: %w", err))
|
||||
if !is.getTSIDByMetricName(&genTSIDLocal, metricNameLocal, date) {
|
||||
panic(fmt.Errorf("cannot obtain tsid for row %d", i))
|
||||
}
|
||||
}
|
||||
}
|
||||
db.putIndexSearch(is)
|
||||
})
|
||||
b.StopTimer()
|
||||
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
}
|
||||
|
|
|
@ -172,14 +172,6 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
pt := mustCreatePartition(ptt, "small-table", "big-table", strg)
|
||||
smallPartsPath := pt.smallPartsPath
|
||||
bigPartsPath := pt.bigPartsPath
|
||||
defer func() {
|
||||
if err := os.RemoveAll("small-table"); err != nil {
|
||||
t.Fatalf("cannot remove small parts directory: %s", err)
|
||||
}
|
||||
if err := os.RemoveAll("big-table"); err != nil {
|
||||
t.Fatalf("cannot remove big parts directory: %s", err)
|
||||
}
|
||||
}()
|
||||
var tmpRows []rawRow
|
||||
for _, rows := range rowss {
|
||||
pt.AddRows(rows)
|
||||
|
@ -194,6 +186,13 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
|
|||
pt = mustOpenPartition(smallPartsPath, bigPartsPath, strg)
|
||||
testPartitionSearch(t, pt, tsids, tr, rbsExpected, rowsCountExpected)
|
||||
pt.MustClose()
|
||||
|
||||
if err := os.RemoveAll("small-table"); err != nil {
|
||||
t.Fatalf("cannot remove small parts directory: %s", err)
|
||||
}
|
||||
if err := os.RemoveAll("big-table"); err != nil {
|
||||
t.Fatalf("cannot remove big parts directory: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testPartitionSearch(t *testing.T, pt *partition, tsids []TSID, tr TimeRange, rbsExpected []rawBlock, rowsCountExpected int64) {
|
||||
|
|
|
@ -2,7 +2,6 @@ package storage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
|
@ -45,6 +44,8 @@ type Storage struct {
|
|||
tooSmallTimestampRows uint64
|
||||
tooBigTimestampRows uint64
|
||||
|
||||
timeseriesRepopulated uint64
|
||||
newTimeseriesCreated uint64
|
||||
slowRowInserts uint64
|
||||
slowPerDayIndexInserts uint64
|
||||
slowMetricNameLoads uint64
|
||||
|
@ -308,7 +309,11 @@ func (s *Storage) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
|
|||
// since it may slow down data ingestion when used frequently.
|
||||
func (s *Storage) DebugFlush() {
|
||||
s.tb.flushPendingRows()
|
||||
s.idb().tb.DebugFlush()
|
||||
idb := s.idb()
|
||||
idb.tb.DebugFlush()
|
||||
idb.doExtDB(func(extDB *indexDB) {
|
||||
extDB.tb.DebugFlush()
|
||||
})
|
||||
}
|
||||
|
||||
// CreateSnapshot creates snapshot for s and returns the snapshot name.
|
||||
|
@ -455,6 +460,8 @@ type Metrics struct {
|
|||
TooSmallTimestampRows uint64
|
||||
TooBigTimestampRows uint64
|
||||
|
||||
TimeseriesRepopulated uint64
|
||||
NewTimeseriesCreated uint64
|
||||
SlowRowInserts uint64
|
||||
SlowPerDayIndexInserts uint64
|
||||
SlowMetricNameLoads uint64
|
||||
|
@ -524,6 +531,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
|
||||
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
|
||||
|
||||
m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated)
|
||||
m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated)
|
||||
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
|
||||
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
|
||||
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
|
||||
|
@ -738,8 +747,9 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
// Persist changes on the file system.
|
||||
fs.MustSyncPath(s.path)
|
||||
|
||||
// Do not flush tsidCache to avoid read/write path slowdown
|
||||
// and slowly re-populate new idb with entries from the cache via maybeCreateIndexes().
|
||||
// Do not flush tsidCache to avoid read/write path slowdown.
|
||||
// The cache is automatically re-populated with new TSID entries
|
||||
// with the updated indexdb generation.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
|
||||
|
||||
// Flush metric id caches for the current and the previous hour,
|
||||
|
@ -753,7 +763,7 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
// These series are already registered in prevHourMetricIDs, so VM doesn't add per-day entries to the current indexdb.
|
||||
// 4. Stop adding new samples for these series just before 5 UTC.
|
||||
// 5. The next indexdb rotation is performed at 4 UTC next day.
|
||||
// The information about the series from step 5 disappears from indexdb, since the old indexdb from step 1 is deleted,
|
||||
// The information about the series from step 3 disappears from indexdb, since the old indexdb from step 1 is deleted,
|
||||
// while the current indexdb doesn't contain information about the series.
|
||||
// So queries for the last 24 hours stop returning samples added at step 3.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
|
||||
|
@ -830,7 +840,6 @@ func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
|
|||
name := "next_day_metric_ids"
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
if !fs.IsPathExist(path) {
|
||||
logger.Infof("nothing to load from %q", path)
|
||||
return e
|
||||
}
|
||||
src, err := os.ReadFile(path)
|
||||
|
@ -870,7 +879,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
|||
}
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
if !fs.IsPathExist(path) {
|
||||
logger.Infof("nothing to load from %q", path)
|
||||
return hm
|
||||
}
|
||||
src, err := os.ReadFile(path)
|
||||
|
@ -1243,8 +1251,7 @@ func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int,
|
|||
if err != nil {
|
||||
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
||||
}
|
||||
// Do not reset MetricName->TSID cache in order to prevent from adding new data points
|
||||
// to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs.
|
||||
// Do not reset MetricName->TSID cache, since it is already reset inside DeleteTSIDs.
|
||||
|
||||
// Do not reset MetricID->MetricName cache, since it must be used only
|
||||
// after filtering out deleted metricIDs.
|
||||
|
@ -1639,61 +1646,129 @@ var metricRowsInsertCtxPool sync.Pool
|
|||
|
||||
const maxMetricRowsPerBlock = 8000
|
||||
|
||||
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
|
||||
// RegisterMetricNames registers all the metric names from mrs in the indexdb, so they can be queried later.
|
||||
//
|
||||
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
|
||||
// The the MetricRow.Timestamp is used for registering the metric name at the given day according to the timestamp.
|
||||
// Th MetricRow.Value field is ignored.
|
||||
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error {
|
||||
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
||||
qt = qt.NewChild("registering %d series", len(mrs))
|
||||
defer qt.Done()
|
||||
var metricName []byte
|
||||
var metricNameBuf []byte
|
||||
var genTSID generationTSID
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
|
||||
var seriesRepopulated uint64
|
||||
|
||||
idb := s.idb()
|
||||
is := idb.getIndexSearch(0, 0, noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
var firstWarn error
|
||||
for i := range mrs {
|
||||
mr := &mrs[i]
|
||||
date := uint64(mr.Timestamp) / msecPerDay
|
||||
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
||||
if err := s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw); err != nil {
|
||||
// Fast path - mr.MetricNameRaw has been already registered in the current idb.
|
||||
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip row, since it exceeds cardinality limit
|
||||
continue
|
||||
}
|
||||
if genTSID.generation == idb.generation {
|
||||
// Fast path - mr.MetricNameRaw has been already registered in the current idb.
|
||||
continue
|
||||
if genTSID.generation != idb.generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
|
||||
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
|
||||
// Do not stop adding rows on error - just skip invalid row.
|
||||
// This guarantees that invalid rows don't prevent
|
||||
// from adding valid rows into the storage.
|
||||
if firstWarn == nil {
|
||||
firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
mn.sortTags()
|
||||
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Slow path - register mr.MetricNameRaw.
|
||||
|
||||
// Slow path - search TSID for the given metricName in indexdb.
|
||||
|
||||
// Construct canonical metric name - it is used below.
|
||||
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
|
||||
// Do not stop adding rows on error - just skip invalid row.
|
||||
// This guarantees that invalid rows don't prevent
|
||||
// from adding valid rows into the storage.
|
||||
if firstWarn == nil {
|
||||
firstWarn = fmt.Errorf("cannot umarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
mn.sortTags()
|
||||
metricName = mn.Marshal(metricName[:0])
|
||||
date := uint64(mr.Timestamp) / msecPerDay
|
||||
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil {
|
||||
if errors.Is(err, errSeriesCardinalityExceeded) {
|
||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||
|
||||
if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
// Slower path - the TSID has been found in indexdb.
|
||||
|
||||
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip the row, since it exceeds the configured cardinality limit.
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err)
|
||||
|
||||
if genTSID.generation != idb.generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
} else {
|
||||
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
|
||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Slowest path - there is no TSID in indexdb for the given mr.MetricNameRaw. Create it.
|
||||
generateTSID(&genTSID.TSID, mn)
|
||||
|
||||
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip the row, since it exceeds the configured cardinality limit.
|
||||
continue
|
||||
}
|
||||
|
||||
// Schedule creating TSID indexes instead of creating them synchronously.
|
||||
// This should keep stable the ingestion rate when new time series are ingested.
|
||||
genTSID.generation = idb.generation
|
||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
}
|
||||
|
||||
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
|
||||
|
||||
if firstWarn != nil {
|
||||
logger.Warnf("cannot create some metrics: %s", firstWarn)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
|
||||
idb := s.idb()
|
||||
is := idb.getIndexSearch(0, 0, noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
|
||||
var (
|
||||
// These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName.
|
||||
prevTSID TSID
|
||||
prevMetricNameRaw []byte
|
||||
)
|
||||
var pmrs *pendingMetricRows
|
||||
var metricNameBuf []byte
|
||||
|
||||
var slowInsertsCount uint64
|
||||
var newSeriesCount uint64
|
||||
var seriesRepopulated uint64
|
||||
|
||||
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
|
||||
|
||||
var genTSID generationTSID
|
||||
|
@ -1737,6 +1812,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
r.Timestamp = mr.Timestamp
|
||||
r.Value = mr.Value
|
||||
r.PrecisionBits = precisionBits
|
||||
|
||||
// Search for TSID for the given mr.MetricNameRaw and store it at r.TSID.
|
||||
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
|
||||
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
|
||||
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
|
||||
|
@ -1744,99 +1821,109 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
continue
|
||||
}
|
||||
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
||||
if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil {
|
||||
// Fast path - the TSID for the given mr.MetricNameRaw has been found in cache and isn't deleted.
|
||||
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
|
||||
// contain MetricName->TSID entries for deleted time series.
|
||||
// See Storage.DeleteSeries code for details.
|
||||
|
||||
if !s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip row, since it exceeds cardinality limit
|
||||
j--
|
||||
continue
|
||||
}
|
||||
r.TSID = genTSID.TSID
|
||||
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
|
||||
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
|
||||
// contain MetricName->TSID entries for deleted time series.
|
||||
// See Storage.DeleteSeries code for details.
|
||||
prevTSID = r.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
|
||||
if genTSID.generation != idb.generation {
|
||||
// The found entry is from the previous cache generation,
|
||||
// so attempt to re-populate the current generation with this entry.
|
||||
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
date := uint64(r.Timestamp) / msecPerDay
|
||||
created, err := is.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw, date)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create indexes: %w", err)
|
||||
}
|
||||
if created {
|
||||
genTSID.generation = idb.generation
|
||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path - the TSID is missing in the cache.
|
||||
// Postpone its search in the loop below.
|
||||
j--
|
||||
if pmrs == nil {
|
||||
pmrs = getPendingMetricRows()
|
||||
}
|
||||
if err := pmrs.addRow(mr); err != nil {
|
||||
// Do not stop adding rows on error - just skip invalid row.
|
||||
// This guarantees that invalid rows don't prevent
|
||||
// from adding valid rows into the storage.
|
||||
if firstWarn == nil {
|
||||
firstWarn = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
if pmrs != nil {
|
||||
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
|
||||
pendingMetricRows := pmrs.pmrs
|
||||
sort.Slice(pendingMetricRows, func(i, j int) bool {
|
||||
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
|
||||
})
|
||||
prevMetricNameRaw = nil
|
||||
var slowInsertsCount uint64
|
||||
for i := range pendingMetricRows {
|
||||
pmr := &pendingMetricRows[i]
|
||||
mr := pmr.mr
|
||||
dstMrs[j] = mr
|
||||
r := &rows[j]
|
||||
j++
|
||||
r.Timestamp = mr.Timestamp
|
||||
r.Value = mr.Value
|
||||
r.PrecisionBits = precisionBits
|
||||
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
|
||||
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
|
||||
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
|
||||
r.TSID = prevTSID
|
||||
continue
|
||||
}
|
||||
slowInsertsCount++
|
||||
date := uint64(r.Timestamp) / msecPerDay
|
||||
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
|
||||
j--
|
||||
if errors.Is(err, errSeriesCardinalityExceeded) {
|
||||
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
|
||||
if firstWarn == nil {
|
||||
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
|
||||
}
|
||||
j--
|
||||
continue
|
||||
}
|
||||
// Do not stop adding rows on error - just skip invalid row.
|
||||
// This guarantees that invalid rows don't prevent
|
||||
// from adding valid rows into the storage.
|
||||
if firstWarn == nil {
|
||||
firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %w", pmr.MetricName, err)
|
||||
}
|
||||
mn.sortTags()
|
||||
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
slowInsertsCount++
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path - the TSID for the given mr.MetricNameRaw is missing in the cache.
|
||||
slowInsertsCount++
|
||||
|
||||
date := uint64(r.Timestamp) / msecPerDay
|
||||
|
||||
// Construct canonical metric name - it is used below.
|
||||
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
|
||||
if firstWarn == nil {
|
||||
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
|
||||
}
|
||||
j--
|
||||
continue
|
||||
}
|
||||
mn.sortTags()
|
||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||
|
||||
// Search for TSID for the given mr.MetricNameRaw in the indexdb.
|
||||
if is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
|
||||
// Slower path - the TSID has been found in indexdb.
|
||||
|
||||
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip the row, since it exceeds the configured cardinality limit.
|
||||
j--
|
||||
continue
|
||||
}
|
||||
genTSID.generation = idb.generation
|
||||
genTSID.TSID = r.TSID
|
||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||
|
||||
prevTSID = r.TSID
|
||||
if genTSID.generation != idb.generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
} else {
|
||||
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
|
||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||
}
|
||||
|
||||
r.TSID = genTSID.TSID
|
||||
prevTSID = genTSID.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
continue
|
||||
}
|
||||
|
||||
// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
|
||||
generateTSID(&genTSID.TSID, mn)
|
||||
|
||||
if !s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw) {
|
||||
// Skip the row, since it exceeds the configured cardinality limit.
|
||||
j--
|
||||
continue
|
||||
}
|
||||
|
||||
genTSID.generation = idb.generation
|
||||
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
|
||||
newSeriesCount++
|
||||
|
||||
r.TSID = genTSID.TSID
|
||||
prevTSID = r.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
|
||||
if logNewSeries {
|
||||
logger.Infof("new series created: %s", mn.String())
|
||||
}
|
||||
putPendingMetricRows(pmrs)
|
||||
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
|
||||
}
|
||||
|
||||
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
|
||||
atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount)
|
||||
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
|
||||
|
||||
if firstWarn != nil {
|
||||
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
|
||||
}
|
||||
|
@ -1857,22 +1944,44 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
|
||||
var storageAddRowsLogger = logger.WithThrottler("storageAddRows", 5*time.Second)
|
||||
|
||||
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error {
|
||||
// SetLogNewSeries updates new series logging.
|
||||
//
|
||||
// This function must be called before any calling any storage functions.
|
||||
func SetLogNewSeries(ok bool) {
|
||||
logNewSeries = ok
|
||||
}
|
||||
|
||||
var logNewSeries = false
|
||||
|
||||
func (s *Storage) createAllIndexesForMetricName(is *indexSearch, mn *MetricName, metricNameRaw []byte, genTSID *generationTSID, date uint64) error {
|
||||
is.createGlobalIndexes(&genTSID.TSID, mn)
|
||||
is.createPerDayIndexes(date, &genTSID.TSID, mn)
|
||||
|
||||
// Store the TSID for for the current indexdb into cache,
|
||||
// so future rows for that TSID are ingested via fast path.
|
||||
s.putTSIDToCache(genTSID, metricNameRaw)
|
||||
|
||||
// Register the (date, metricID) entry in the cache,
|
||||
// so next time the entry is found there instead of searching for it in the indexdb.
|
||||
s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool {
|
||||
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
|
||||
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)
|
||||
logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems())
|
||||
return errSeriesCardinalityExceeded
|
||||
return false
|
||||
}
|
||||
if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) {
|
||||
atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1)
|
||||
logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems())
|
||||
return errSeriesCardinalityExceeded
|
||||
return false
|
||||
}
|
||||
return nil
|
||||
return true
|
||||
}
|
||||
|
||||
var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded")
|
||||
|
||||
func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) {
|
||||
select {
|
||||
case <-logSkippedSeriesTicker.C:
|
||||
|
@ -1895,75 +2004,6 @@ func getUserReadableMetricName(metricNameRaw []byte) string {
|
|||
return mn.String()
|
||||
}
|
||||
|
||||
type pendingMetricRow struct {
|
||||
MetricName []byte
|
||||
mr *MetricRow
|
||||
}
|
||||
|
||||
type pendingMetricRows struct {
|
||||
pmrs []pendingMetricRow
|
||||
metricNamesBuf []byte
|
||||
|
||||
lastMetricNameRaw []byte
|
||||
lastMetricName []byte
|
||||
mn MetricName
|
||||
}
|
||||
|
||||
func (pmrs *pendingMetricRows) reset() {
|
||||
mrs := pmrs.pmrs
|
||||
for i := range mrs {
|
||||
pmr := &mrs[i]
|
||||
pmr.MetricName = nil
|
||||
pmr.mr = nil
|
||||
}
|
||||
pmrs.pmrs = mrs[:0]
|
||||
pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0]
|
||||
pmrs.lastMetricNameRaw = nil
|
||||
pmrs.lastMetricName = nil
|
||||
pmrs.mn.Reset()
|
||||
}
|
||||
|
||||
func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
|
||||
// Do not spend CPU time on re-calculating canonical metricName during bulk import
|
||||
// of many rows for the same metric.
|
||||
if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) {
|
||||
if err := pmrs.mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
|
||||
}
|
||||
pmrs.mn.sortTags()
|
||||
metricNamesBufLen := len(pmrs.metricNamesBuf)
|
||||
pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf)
|
||||
pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:]
|
||||
pmrs.lastMetricNameRaw = mr.MetricNameRaw
|
||||
}
|
||||
mrs := pmrs.pmrs
|
||||
if cap(mrs) > len(mrs) {
|
||||
mrs = mrs[:len(mrs)+1]
|
||||
} else {
|
||||
mrs = append(mrs, pendingMetricRow{})
|
||||
}
|
||||
pmrs.pmrs = mrs
|
||||
pmr := &mrs[len(mrs)-1]
|
||||
pmr.MetricName = pmrs.lastMetricName
|
||||
pmr.mr = mr
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPendingMetricRows() *pendingMetricRows {
|
||||
v := pendingMetricRowsPool.Get()
|
||||
if v == nil {
|
||||
v = &pendingMetricRows{}
|
||||
}
|
||||
return v.(*pendingMetricRows)
|
||||
}
|
||||
|
||||
func putPendingMetricRows(pmrs *pendingMetricRows) {
|
||||
pmrs.reset()
|
||||
pendingMetricRowsPool.Put(pmrs)
|
||||
}
|
||||
|
||||
var pendingMetricRowsPool sync.Pool
|
||||
|
||||
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||
var date uint64
|
||||
var hour uint64
|
||||
|
@ -1983,11 +2023,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
// pMin linearly increases from 0 to 1 during the last hour of the day.
|
||||
pMin := (float64(ts%(3600*24)) / 3600) - 23
|
||||
type pendingDateMetricID struct {
|
||||
date uint64
|
||||
metricID uint64
|
||||
accountID uint32
|
||||
projectID uint32
|
||||
mr *MetricRow
|
||||
date uint64
|
||||
tsid *TSID
|
||||
mr *MetricRow
|
||||
}
|
||||
var pendingDateMetricIDs []pendingDateMetricID
|
||||
var pendingNextDayMetricIDs []uint64
|
||||
|
@ -2007,7 +2045,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
prevDate = date
|
||||
prevMetricID = metricID
|
||||
if hour == hm.hour {
|
||||
// The r belongs to the current hour. Check for the current hour cache.
|
||||
// The row belongs to the current hour. Check for the current hour cache.
|
||||
if hm.m.Has(metricID) {
|
||||
// Fast path: the metricID is in the current hour cache.
|
||||
// This means the metricID has been already added to per-day inverted index.
|
||||
|
@ -2020,11 +2058,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
|
||||
if p < pMin && !nextDayMetricIDs.Has(metricID) {
|
||||
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
|
||||
date: date + 1,
|
||||
metricID: metricID,
|
||||
accountID: r.TSID.AccountID,
|
||||
projectID: r.TSID.ProjectID,
|
||||
mr: mrs[i],
|
||||
date: date + 1,
|
||||
tsid: &r.TSID,
|
||||
mr: mrs[i],
|
||||
})
|
||||
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
|
||||
}
|
||||
|
@ -2049,11 +2085,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
}
|
||||
// Slow path: store the (date, metricID) entry in the indexDB.
|
||||
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
|
||||
date: date,
|
||||
metricID: metricID,
|
||||
accountID: r.TSID.AccountID,
|
||||
projectID: r.TSID.ProjectID,
|
||||
mr: mrs[i],
|
||||
date: date,
|
||||
tsid: &r.TSID,
|
||||
mr: mrs[i],
|
||||
})
|
||||
}
|
||||
if len(pendingNextDayMetricIDs) > 0 {
|
||||
|
@ -2067,7 +2101,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
s.pendingHourEntriesLock.Unlock()
|
||||
}
|
||||
if len(pendingDateMetricIDs) == 0 {
|
||||
// Fast path - there are no new (date, metricID) entries in rows.
|
||||
// Fast path - there are no new (date, metricID) entries.
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2078,37 +2112,31 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
sort.Slice(pendingDateMetricIDs, func(i, j int) bool {
|
||||
a := pendingDateMetricIDs[i]
|
||||
b := pendingDateMetricIDs[j]
|
||||
if a.accountID != b.accountID {
|
||||
return a.accountID < b.accountID
|
||||
if a.tsid.AccountID != b.tsid.AccountID {
|
||||
return a.tsid.AccountID < b.tsid.AccountID
|
||||
}
|
||||
if a.projectID != b.projectID {
|
||||
return a.projectID < b.projectID
|
||||
if a.tsid.ProjectID != b.tsid.ProjectID {
|
||||
return a.tsid.ProjectID < b.tsid.ProjectID
|
||||
}
|
||||
if a.date != b.date {
|
||||
return a.date < b.date
|
||||
}
|
||||
return a.metricID < b.metricID
|
||||
return a.tsid.MetricID < b.tsid.MetricID
|
||||
})
|
||||
|
||||
idb := s.idb()
|
||||
is := idb.getIndexSearch(0, 0, noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
|
||||
var firstError error
|
||||
dateMetricIDsForCache := make([]dateMetricID, 0, len(pendingDateMetricIDs))
|
||||
mn := GetMetricName()
|
||||
for _, dmid := range pendingDateMetricIDs {
|
||||
date := dmid.date
|
||||
metricID := dmid.metricID
|
||||
ok, err := is.hasDateMetricID(date, metricID, dmid.accountID, dmid.projectID)
|
||||
if err != nil {
|
||||
if firstError == nil {
|
||||
firstError = fmt.Errorf("error when locating (date=%s, metricID=%d, accountID=%d, projectID=%d) in database: %w",
|
||||
dateToString(date), metricID, dmid.accountID, dmid.projectID, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
metricID := dmid.tsid.MetricID
|
||||
if !is.hasDateMetricIDNoExtDB(date, metricID, dmid.tsid.AccountID, dmid.tsid.ProjectID) {
|
||||
// The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes.
|
||||
// It is OK if the (date, metricID) entry is added multiple times to db
|
||||
// It is OK if the (date, metricID) entry is added multiple times to indexdb
|
||||
// by concurrent goroutines.
|
||||
if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil {
|
||||
if firstError == nil {
|
||||
|
@ -2117,7 +2145,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
continue
|
||||
}
|
||||
mn.sortTags()
|
||||
is.createPerDayIndexes(date, metricID, mn)
|
||||
is.createPerDayIndexes(date, dmid.tsid, mn)
|
||||
}
|
||||
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
|
||||
date: date,
|
||||
|
|
|
@ -916,9 +916,7 @@ func testStorageRegisterMetricNames(s *Storage) error {
|
|||
}
|
||||
mrs = append(mrs, mr)
|
||||
}
|
||||
if err := s.RegisterMetricNames(nil, mrs); err != nil {
|
||||
return fmt.Errorf("unexpected error in RegisterMetricNames: %w", err)
|
||||
}
|
||||
s.RegisterMetricNames(nil, mrs)
|
||||
}
|
||||
var addIDsExpected []string
|
||||
for k := range addIDsMap {
|
||||
|
|
Loading…
Reference in a new issue