mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: follow-up after 2ce02a7fe6
- Document the change at docs/CHANGELOG.md - Clarify comments for non-trivial code touched by the commit - Improve the logic behind maybeCreateIndexes(): - Correctly create per-day indexes if the indexdb rotation is performed during the first hour or the last hour of the day by UTC. Previously there was a possibility of missing index entries on that day. - Increase the duration for creating new indexes in the current indexdb for up to 22 hours after indexdb rotation. This should reduce the increased resource usage after indexdb rotation. It is safe to postpone index creation for the current day until the last hour of the current day after indexdb rotation by UTC, since the corresponding (date, ...) entries exist in the previous indexdb. - Search for TSID by (date, MetricName) in both the current and the previous indexdb. Previously the search was performed only in the current indexdb. This could lead to excess creation of per-day indexes for the current day just after indexdb rotation. - Search for (date, metricID) entries in both the current and the previous indexdb. Previously the search was performed only in the current indexdb. This could lead to excess creation of per-day indexes for the current day just after indexdb rotation.
This commit is contained in:
parent
2ce02a7fe6
commit
e0e16a2d36
5 changed files with 253 additions and 161 deletions
|
@ -27,6 +27,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
* SECURITY: upgrade Go builder from Go1.20.3 to Go1.20.4. See [the list of issues addressed in Go1.20.4](https://github.com/golang/go/issues?q=milestone%3AGo1.20.4+label%3ACherryPickApproved).
|
* SECURITY: upgrade Go builder from Go1.20.3 to Go1.20.4. See [the list of issues addressed in Go1.20.4](https://github.com/golang/go/issues?q=milestone%3AGo1.20.4+label%3ACherryPickApproved).
|
||||||
* SECURITY: serve `/robots.txt` content to disallow indexing of the exposed instances by search engines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4128) for details.
|
* SECURITY: serve `/robots.txt` content to disallow indexing of the exposed instances by search engines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4128) for details.
|
||||||
|
|
||||||
|
* FETURE: reduce memory usage by 2x and more for setups with big [retention](https://docs.victoriametrics.com/#retention) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4252).
|
||||||
* FEATURE: deprecate `-bigMergeConcurrency` command-line flag, since improper configuration for this flag frequently led to uncontrolled growth of unmerged parts, which, in turn, could lead to queries slowdown and increased CPU usage. The concurrency for [background merges](https://docs.victoriametrics.com/#storage) can be controlled via `-smallMergeConcurrency` command-line flag, though it isn't recommended to change this flag in general case.
|
* FEATURE: deprecate `-bigMergeConcurrency` command-line flag, since improper configuration for this flag frequently led to uncontrolled growth of unmerged parts, which, in turn, could lead to queries slowdown and increased CPU usage. The concurrency for [background merges](https://docs.victoriametrics.com/#storage) can be controlled via `-smallMergeConcurrency` command-line flag, though it isn't recommended to change this flag in general case.
|
||||||
* FEATURE: do not execute the incoming request if it has been canceled by the client before the execution start. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4223).
|
* FEATURE: do not execute the incoming request if it has been canceled by the client before the execution start. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4223).
|
||||||
* FEATURE: support time formats with timezones. For example, `2024-01-02+02:00` means `January 2, 2024` at `+02:00` time zone. See [these docs](https://docs.victoriametrics.com/#timestamp-formats).
|
* FEATURE: support time formats with timezones. For example, `2024-01-02+02:00` means `January 2, 2024` at `+02:00` time zone. See [these docs](https://docs.victoriametrics.com/#timestamp-formats).
|
||||||
|
|
|
@ -31,10 +31,16 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Prefix for MetricName->TSID entries.
|
// Prefix for MetricName->TSID entries.
|
||||||
// This index was substituted with nsPrefixDateMetricNameToTSID
|
//
|
||||||
// for resource efficiency sake. The intention is to speed up
|
// This index was substituted with nsPrefixDateMetricNameToTSID,
|
||||||
// index lookups on busy installations. Looking up in per-day
|
// since the MetricName->TSID index may require big amounts of memory for indexdb/dataBlocks cache
|
||||||
// index supposed to be much more efficient comparing to global index.
|
// when it grows big on the configured retention under high churn rate
|
||||||
|
// (e.g. when new time series are constantly registered).
|
||||||
|
//
|
||||||
|
// It is much more efficient from memory usage PoV to query per-day MetricName->TSID index
|
||||||
|
// (aka nsPrefixDateMetricNameToTSID) when the TSID must be obtained for the given MetricName
|
||||||
|
// during data ingestion under high churn rate and big retention.
|
||||||
|
//
|
||||||
// nsPrefixMetricNameToTSID = 0
|
// nsPrefixMetricNameToTSID = 0
|
||||||
|
|
||||||
// Prefix for Tag->MetricID entries.
|
// Prefix for Tag->MetricID entries.
|
||||||
|
@ -384,21 +390,22 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
|
||||||
db.s.metricNameCache.Set(key[:], metricName)
|
db.s.metricNameCache.Set(key[:], metricName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db.
|
// maybeCreateIndexes probabilistically creates global and per-day indexes for already existing tsid.
|
||||||
//
|
//
|
||||||
// The probability increases from 0 to 100% during the first hour since db rotation.
|
// The tsid is obtained from the previous indexdb, new indexes are created in the current indexdb.
|
||||||
//
|
//
|
||||||
// It returns true if new index entry was created, and false if it was skipped.
|
// The probability increases from 0 to 100% until the last hour of the indexdb rotation day.
|
||||||
func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) {
|
//
|
||||||
pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600
|
// True is returned if new index entries were created, and false otherwise.
|
||||||
if pMin < 1 {
|
func (is *indexSearch) maybeCreateIndexes(genTSID *generationTSID, metricNameRaw []byte, date uint64) (bool, error) {
|
||||||
p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
|
tsid := &genTSID.TSID
|
||||||
if p > pMin {
|
currentTimestamp := fasttime.UnixTimestamp()
|
||||||
// Fast path: there is no need creating indexes for metricNameRaw yet.
|
if canPostponeIndexCreation(currentTimestamp, is.db.rotationTimestamp, date, tsid.MetricID) {
|
||||||
return false, nil
|
// Fast path - postpone index creation
|
||||||
}
|
return false, nil
|
||||||
}
|
}
|
||||||
// Slow path: create indexes for (tsid, metricNameRaw) at db.
|
|
||||||
|
// Slow path: create indexes for (tsid, metricNameRaw, date).
|
||||||
mn := GetMetricName()
|
mn := GetMetricName()
|
||||||
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
|
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
|
||||||
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
|
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
|
||||||
|
@ -407,10 +414,58 @@ func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date
|
||||||
is.createGlobalIndexes(tsid, mn)
|
is.createGlobalIndexes(tsid, mn)
|
||||||
is.createPerDayIndexes(date, tsid, mn)
|
is.createPerDayIndexes(date, tsid, mn)
|
||||||
PutMetricName(mn)
|
PutMetricName(mn)
|
||||||
|
|
||||||
|
genTSID.generation = is.db.generation
|
||||||
|
|
||||||
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
|
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func canPostponeIndexCreation(currentTimestamp, rotationTimestamp, date, metricID uint64) bool {
|
||||||
|
if rotationTimestamp/(24*3600) != date {
|
||||||
|
// The date doesn't match the day of the indexdb rotation.
|
||||||
|
// This means that the previoud indexdb doesn't contain the needed indexes
|
||||||
|
// for the given date, so index must be created in the current indexdb
|
||||||
|
// in order to be able to find time series for the given date.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate the time window when the index creation can be postponed.
|
||||||
|
// It starts just after the last indexdb rotation and ends one hour before the next day
|
||||||
|
// after the indexdb rotation.
|
||||||
|
d := rotationTimestamp % (24 * 3600)
|
||||||
|
if d < 3600 {
|
||||||
|
// The indexdb rotation occurred in less than an hour after the given date started.
|
||||||
|
// This means that the previous indexdb may miss entries for the given date.
|
||||||
|
// So index for the given date must be created in the current indexdb
|
||||||
|
// in order to be able to find time series for the given date.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
d = 24*3600 - d
|
||||||
|
if d < 3600 {
|
||||||
|
// The indexdb rotation occurred in less than an hour before the next date.
|
||||||
|
// Index for the given date must be created in the current indexdb
|
||||||
|
// in order to guarantee that it is created until the next date starts.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
d -= 3600
|
||||||
|
|
||||||
|
timeSinceRotation := currentTimestamp - rotationTimestamp
|
||||||
|
if currentTimestamp < rotationTimestamp || timeSinceRotation > d {
|
||||||
|
// The time window for postponing index creation is over.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Calculate the probability of index creation for the given metricID
|
||||||
|
pMin := uint64((float64(timeSinceRotation) / float64(d)) * (1 << 64))
|
||||||
|
p := fastHashUint64(metricID)
|
||||||
|
if p < pMin {
|
||||||
|
// The time window for creating the index is over
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// We have some time for postponing index creation
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
|
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
|
||||||
prefix := ^uint64(0)
|
prefix := ^uint64(0)
|
||||||
if versioned {
|
if versioned {
|
||||||
|
@ -480,25 +535,28 @@ func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
|
||||||
return dst, nil
|
return dst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
|
// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date.
|
||||||
//
|
//
|
||||||
// It returns io.EOF if the given mn isn't found locally.
|
// It returns false if the given metricName isn't found in the indexdb.
|
||||||
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte, date uint64) error {
|
func (is *indexSearch) getTSIDByMetricName(dst *generationTSID, metricName []byte, date uint64) bool {
|
||||||
is := db.getIndexSearch(noDeadline)
|
if is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date) {
|
||||||
err := is.getTSIDByMetricName(dst, metricName, date)
|
// Fast path - the TSID is found in the current indexdb.
|
||||||
db.putIndexSearch(is)
|
dst.generation = is.db.generation
|
||||||
if err == nil {
|
return true
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != io.EOF {
|
|
||||||
return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do not search for the TSID in the external storage,
|
// Slow path - search for the TSID in the previous indexdb
|
||||||
// since this function is already called by another indexDB instance.
|
ok := false
|
||||||
|
deadline := is.deadline
|
||||||
// The TSID for the given mn wasn't found.
|
is.db.doExtDB(func(extDB *indexDB) {
|
||||||
return io.EOF
|
is := extDB.getIndexSearch(deadline)
|
||||||
|
ok = is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date)
|
||||||
|
extDB.putIndexSearch(is)
|
||||||
|
if ok {
|
||||||
|
dst.generation = extDB.generation
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
type indexSearch struct {
|
type indexSearch struct {
|
||||||
|
@ -516,31 +574,23 @@ type indexSearch struct {
|
||||||
tsidByNameSkips int
|
tsidByNameSkips int
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetOrCreateTSIDByName fills the dst with TSID for the given metricName.
|
// GetOrCreateTSIDByName fills the dst with TSID for the given (date, metricName)
|
||||||
//
|
//
|
||||||
// It also registers the metricName in global and per-day indexes
|
// The function creates new TSID and registers the metricName in global and per-day indexes
|
||||||
// for the given date if the metricName->TSID entry is missing in the index.
|
// if TSID for the given (date, metricName) isn't found.
|
||||||
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
|
func (is *indexSearch) GetOrCreateTSIDByName(dst *generationTSID, metricName, metricNameRaw []byte, date uint64) error {
|
||||||
// A hack: skip searching for the TSID after many serial misses.
|
// A hack: skip searching for the TSID after many serial misses.
|
||||||
// This should improve insertion performance for big batches
|
// This should improve insertion performance for big batches
|
||||||
// of new time series.
|
// of new time series.
|
||||||
if is.tsidByNameMisses < 100 {
|
if is.tsidByNameMisses < 100 {
|
||||||
err := is.getTSIDByMetricName(dst, metricName, date)
|
if is.getTSIDByMetricName(dst, metricName, date) {
|
||||||
if err == nil {
|
|
||||||
// Fast path - the TSID for the given metricName has been found in the index.
|
// Fast path - the TSID for the given metricName has been found in the index.
|
||||||
is.tsidByNameMisses = 0
|
is.tsidByNameMisses = 0
|
||||||
if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
|
if err := is.db.s.registerSeriesCardinality(dst.TSID.MetricID, metricNameRaw); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// There is no need in checking whether the TSID is present in the per-day index for the given date,
|
|
||||||
// since this check must be performed by the caller in an optimized way.
|
|
||||||
// See storage.updatePerDateData() function.
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != io.EOF {
|
|
||||||
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
|
|
||||||
return fmt.Errorf("cannot search TSID by MetricName %s: %w", userReadableMetricName, err)
|
|
||||||
}
|
|
||||||
is.tsidByNameMisses++
|
is.tsidByNameMisses++
|
||||||
} else {
|
} else {
|
||||||
is.tsidByNameSkips++
|
is.tsidByNameSkips++
|
||||||
|
@ -550,13 +600,14 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TSID for the given name wasn't found. Create it.
|
// TSID for the given (date, metricName) wasn't found. Create it.
|
||||||
// It is OK if duplicate TSID for mn is created by concurrent goroutines.
|
// It is OK if duplicate TSID for (date, metricName) is created by concurrent goroutines.
|
||||||
// Metric results will be merged by mn after TableSearch.
|
// Metric results will be merged by metricName at TableSearch.
|
||||||
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
|
if err := is.createTSIDByMetricName(&dst.TSID, metricName, metricNameRaw, date); err != nil {
|
||||||
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
|
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
|
||||||
return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
|
return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
|
||||||
}
|
}
|
||||||
|
dst.generation = is.db.generation
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,17 +636,15 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
|
||||||
db.indexSearchPool.Put(is)
|
db.indexSearchPool.Put(is)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
|
// createTSIDByMetricName creates new TSID for the given (date, metricName)
|
||||||
|
func (is *indexSearch) createTSIDByMetricName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
|
||||||
mn := GetMetricName()
|
mn := GetMetricName()
|
||||||
defer PutMetricName(mn)
|
defer PutMetricName(mn)
|
||||||
if err := mn.Unmarshal(metricName); err != nil {
|
if err := mn.Unmarshal(metricName); err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
|
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
created, err := is.db.getOrCreateTSID(dst, metricName, mn, date)
|
generateTSID(dst, mn)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot generate TSID: %w", err)
|
|
||||||
}
|
|
||||||
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
|
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -605,12 +654,9 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b
|
||||||
// There is no need in invalidating tag cache, since it is invalidated
|
// There is no need in invalidating tag cache, since it is invalidated
|
||||||
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable.
|
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable.
|
||||||
|
|
||||||
if created {
|
atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
|
||||||
// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
|
if logNewSeries {
|
||||||
atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
|
logger.Infof("new series created: %s", mn.String())
|
||||||
if logNewSeries {
|
|
||||||
logger.Infof("new series created: %s", mn.String())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -624,30 +670,6 @@ func SetLogNewSeries(ok bool) {
|
||||||
|
|
||||||
var logNewSeries = false
|
var logNewSeries = false
|
||||||
|
|
||||||
// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found.
|
|
||||||
//
|
|
||||||
// Returns true if TSID was created or false if TSID was in extDB
|
|
||||||
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName, date uint64) (bool, error) {
|
|
||||||
// Search the TSID in the external storage.
|
|
||||||
// This is usually the db from the previous period.
|
|
||||||
var err error
|
|
||||||
if db.doExtDB(func(extDB *indexDB) {
|
|
||||||
err = extDB.getTSIDByNameNoCreate(dst, metricName, date)
|
|
||||||
}) {
|
|
||||||
if err == nil {
|
|
||||||
// The TSID has been found in the external storage.
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
if err != io.EOF {
|
|
||||||
return false, fmt.Errorf("external search failed: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// The TSID wasn't found in the external storage.
|
|
||||||
// Generate it locally.
|
|
||||||
generateTSID(dst, mn)
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func generateTSID(dst *TSID, mn *MetricName) {
|
func generateTSID(dst *TSID, mn *MetricName) {
|
||||||
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
|
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
|
||||||
// Assume that the job-like metric is put at mn.Tags[0], while instance-like metric is put at mn.Tags[1]
|
// Assume that the job-like metric is put at mn.Tags[0], while instance-like metric is put at mn.Tags[1]
|
||||||
|
@ -1935,11 +1957,11 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
|
||||||
|
|
||||||
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
|
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte, date uint64) error {
|
func (is *indexSearch) getTSIDByMetricNameNoExtDB(dst *TSID, metricName []byte, date uint64) bool {
|
||||||
dmis := is.db.s.getDeletedMetricIDs()
|
dmis := is.db.s.getDeletedMetricIDs()
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
kb := &is.kb
|
kb := &is.kb
|
||||||
kb.B = append(kb.B[:0], nsPrefixDateMetricNameToTSID)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateMetricNameToTSID)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
kb.B = append(kb.B, metricName...)
|
kb.B = append(kb.B, metricName...)
|
||||||
kb.B = append(kb.B, kvSeparatorChar)
|
kb.B = append(kb.B, kvSeparatorChar)
|
||||||
|
@ -1947,15 +1969,15 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte, date ui
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
if !bytes.HasPrefix(ts.Item, kb.B) {
|
if !bytes.HasPrefix(ts.Item, kb.B) {
|
||||||
// Nothing found.
|
// Nothing found.
|
||||||
return io.EOF
|
return false
|
||||||
}
|
}
|
||||||
v := ts.Item[len(kb.B):]
|
v := ts.Item[len(kb.B):]
|
||||||
tail, err := dst.Unmarshal(v)
|
tail, err := dst.Unmarshal(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal TSID: %w", err)
|
logger.Panicf("FATAL: cannot unmarshal TSID: %s", err)
|
||||||
}
|
}
|
||||||
if len(tail) > 0 {
|
if len(tail) > 0 {
|
||||||
return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
|
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling TSID: %X", tail)
|
||||||
}
|
}
|
||||||
if dmis.Len() > 0 {
|
if dmis.Len() > 0 {
|
||||||
// Verify whether the dst is marked as deleted.
|
// Verify whether the dst is marked as deleted.
|
||||||
|
@ -1965,13 +1987,13 @@ func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte, date ui
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Found valid dst.
|
// Found valid dst.
|
||||||
return nil
|
return true
|
||||||
}
|
}
|
||||||
if err := ts.Error(); err != nil {
|
if err := ts.Error(); err != nil {
|
||||||
return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err)
|
logger.Panicf("FATAL: error when searching TSID by metricName; searchPrefix %q: %s", kb.B, err)
|
||||||
}
|
}
|
||||||
// Nothing found
|
// Nothing found
|
||||||
return io.EOF
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
|
func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
|
||||||
|
@ -2846,6 +2868,9 @@ func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricNa
|
||||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
ii.registerTagIndexes(kb.B, mn, tsid.MetricID)
|
ii.registerTagIndexes(kb.B, mn, tsid.MetricID)
|
||||||
is.db.tb.AddItems(ii.Items)
|
is.db.tb.AddItems(ii.Items)
|
||||||
|
|
||||||
|
// Register the (date, metricID) entry in the cache,
|
||||||
|
// so next time the entry is found there instead of searching for it in the indexdb.
|
||||||
is.db.s.dateMetricIDCache.Set(date, tsid.MetricID)
|
is.db.s.dateMetricIDCache.Set(date, tsid.MetricID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2956,22 +2981,41 @@ func reverseBytes(dst, src []byte) []byte {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
|
func (is *indexSearch) hasDateMetricID(date, metricID uint64) bool {
|
||||||
|
if is.hasDateMetricIDNoExtDB(date, metricID) {
|
||||||
|
// Fast path - the (date, metricID) entry is found in the current indexdb.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path - search for the (date, metricID) in the previous indexdb.
|
||||||
|
ok := false
|
||||||
|
deadline := is.deadline
|
||||||
|
is.db.doExtDB(func(extDB *indexDB) {
|
||||||
|
is := extDB.getIndexSearch(deadline)
|
||||||
|
ok = is.hasDateMetricIDNoExtDB(date, metricID)
|
||||||
|
extDB.putIndexSearch(is)
|
||||||
|
})
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (is *indexSearch) hasDateMetricIDNoExtDB(date, metricID uint64) bool {
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
kb := &is.kb
|
kb := &is.kb
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, metricID)
|
kb.B = encoding.MarshalUint64(kb.B, metricID)
|
||||||
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
|
err := ts.FirstItemWithPrefix(kb.B)
|
||||||
if err == io.EOF {
|
if err == nil {
|
||||||
return false, nil
|
if string(ts.Item) != string(kb.B) {
|
||||||
|
logger.Panicf("FATAL: unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
|
||||||
}
|
}
|
||||||
return false, fmt.Errorf("error when searching for (date=%s, metricID=%d) entry: %w", dateToString(date), metricID, err)
|
// Fast path - the (date, metricID) entry is found in the current indexdb.
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
if string(ts.Item) != string(kb.B) {
|
if err != io.EOF {
|
||||||
return false, fmt.Errorf("unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
|
logger.Panicf("FATAL: unexpected error when searching for (date=%s, metricID=%d) entry: %s", dateToString(date), metricID, err)
|
||||||
}
|
}
|
||||||
return true, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,
|
func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,
|
||||||
|
|
|
@ -22,6 +22,49 @@ import (
|
||||||
"github.com/VictoriaMetrics/fastcache"
|
"github.com/VictoriaMetrics/fastcache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestCanPostponeIndexCreation(t *testing.T) {
|
||||||
|
f := func(currentTime, rotationTimestamp, date, metricID uint64, resultExpected bool) {
|
||||||
|
t.Helper()
|
||||||
|
result := canPostponeIndexCreation(currentTime, rotationTimestamp, date, metricID)
|
||||||
|
if result != resultExpected {
|
||||||
|
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// the date doesn't match rotationTimestamp
|
||||||
|
f(0, 0, 1, 0, false)
|
||||||
|
|
||||||
|
// the rotationTimestamp is less than an hour to the day start
|
||||||
|
f(0, 0, 0, 0, false)
|
||||||
|
|
||||||
|
// the rotationTimestamp is less than an hour before the next day start
|
||||||
|
f(23*3600+10, 23*3600+1, 0, 0, false)
|
||||||
|
|
||||||
|
// the time for index creation is over
|
||||||
|
f(0, 10*3600, 0, 0, false)
|
||||||
|
f(23*3600+1, 10*3600, 0, 0, false)
|
||||||
|
f(24*3600, 10*3600, 0, 0, false)
|
||||||
|
f(1234567890, 0, 0, 0, false)
|
||||||
|
|
||||||
|
// the time for index creation for the given metricID is over
|
||||||
|
f(23*3600-1, 4*3600, 0, 1, false)
|
||||||
|
f(22*3600, 4*3600, 0, 1, false)
|
||||||
|
f(10*3600, 4*3600, 0, 1, false)
|
||||||
|
|
||||||
|
// it is ok to create index for the given metricID
|
||||||
|
f(4*3600, 4*3600, 0, 1, true)
|
||||||
|
f(7*3600, 4*3600, 0, 1, true)
|
||||||
|
f(9*3600, 4*3600, 0, 1, true)
|
||||||
|
|
||||||
|
// different metricID
|
||||||
|
f(10*3600, 4*3600, 0, 12345, true)
|
||||||
|
f(14*3600, 4*3600, 0, 12345, true)
|
||||||
|
f(15*3600, 4*3600, 0, 12345, true)
|
||||||
|
f(16*3600, 4*3600, 0, 12345, false)
|
||||||
|
f(17*3600, 4*3600, 0, 12345, false)
|
||||||
|
f(20*3600, 4*3600, 0, 12345, false)
|
||||||
|
}
|
||||||
|
|
||||||
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
|
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
|
||||||
f := func(metricIDs []uint64) {
|
f := func(metricIDs []uint64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
@ -614,13 +657,13 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
|
||||||
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
||||||
|
|
||||||
// Create tsid for the metricName.
|
// Create tsid for the metricName.
|
||||||
var tsid TSID
|
var genTSID generationTSID
|
||||||
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, date); err != nil {
|
if err := is.GetOrCreateTSIDByName(&genTSID, metricNameBuf, metricNameRawBuf, date); err != nil {
|
||||||
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
|
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mns = append(mns, mn)
|
mns = append(mns, mn)
|
||||||
tsids = append(tsids, tsid)
|
tsids = append(tsids, genTSID.TSID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// fill Date -> MetricID cache
|
// fill Date -> MetricID cache
|
||||||
|
@ -647,7 +690,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
|
|
||||||
currentTime := timestampFromTime(time.Now())
|
currentTime := timestampFromTime(time.Now())
|
||||||
timeseriesCounters := make(map[uint64]bool)
|
timeseriesCounters := make(map[uint64]bool)
|
||||||
var tsidCopy TSID
|
var genTSID generationTSID
|
||||||
var metricNameCopy []byte
|
var metricNameCopy []byte
|
||||||
allLabelNames := make(map[string]bool)
|
allLabelNames := make(map[string]bool)
|
||||||
for i := range mns {
|
for i := range mns {
|
||||||
|
@ -660,26 +703,29 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
|
||||||
mn.sortTags()
|
mn.sortTags()
|
||||||
metricName := mn.Marshal(nil)
|
metricName := mn.Marshal(nil)
|
||||||
|
|
||||||
if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName, uint64(currentTime)/msecPerDay); err != nil {
|
is := db.getIndexSearch(noDeadline)
|
||||||
return fmt.Errorf("cannot obtain tsid #%d for mn %s: %w", i, mn, err)
|
if !is.getTSIDByMetricName(&genTSID, metricName, uint64(currentTime)/msecPerDay) {
|
||||||
|
return fmt.Errorf("cannot obtain tsid #%d for mn %s", i, mn)
|
||||||
}
|
}
|
||||||
|
db.putIndexSearch(is)
|
||||||
|
|
||||||
if isConcurrent {
|
if isConcurrent {
|
||||||
// Copy tsid.MetricID, since multiple TSIDs may match
|
// Copy tsid.MetricID, since multiple TSIDs may match
|
||||||
// the same mn in concurrent mode.
|
// the same mn in concurrent mode.
|
||||||
tsidCopy.MetricID = tsid.MetricID
|
genTSID.TSID.MetricID = tsid.MetricID
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(tsid, &tsidCopy) {
|
if !reflect.DeepEqual(tsid, &genTSID.TSID) {
|
||||||
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &tsidCopy, tsid)
|
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &genTSID.TSID, tsid)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Search for metric name for the given metricID.
|
// Search for metric name for the given metricID.
|
||||||
var err error
|
var err error
|
||||||
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], tsidCopy.MetricID)
|
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], genTSID.TSID.MetricID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err)
|
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", genTSID.TSID.MetricID, i, err)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(metricName, metricNameCopy) {
|
if !bytes.Equal(metricName, metricNameCopy) {
|
||||||
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName)
|
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", genTSID.TSID.MetricID, metricNameCopy, metricName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try searching metric name for non-existent MetricID.
|
// Try searching metric name for non-existent MetricID.
|
||||||
|
@ -1461,9 +1507,11 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const metricRowsN = 1000
|
const metricRowsN = 1000
|
||||||
// use min-max timestamps of 1month range to create smaller number of partitions
|
|
||||||
timeMin, timeMax := time.Now().Add(-730*time.Hour), time.Now()
|
currentDayTimestamp := (time.Now().UnixMilli() / msecPerDay) * msecPerDay
|
||||||
mrs := testGenerateMetricRows(r, metricRowsN, timeMin.UnixMilli(), timeMax.UnixMilli())
|
timeMin := currentDayTimestamp - 24*3600*1000
|
||||||
|
timeMax := currentDayTimestamp + 24*3600*1000
|
||||||
|
mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
|
||||||
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
|
||||||
t.Fatalf("unexpected error when adding mrs: %s", err)
|
t.Fatalf("unexpected error when adding mrs: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -1528,17 +1576,14 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
|
||||||
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
|
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
|
||||||
entriesByGeneration[genTSID.generation]++
|
entriesByGeneration[genTSID.generation]++
|
||||||
}
|
}
|
||||||
if len(entriesByGeneration) > 2 {
|
if len(entriesByGeneration) != 2 {
|
||||||
t.Fatalf("expecting two generations; got %d", entriesByGeneration)
|
t.Fatalf("expecting two generations; got %d", entriesByGeneration)
|
||||||
}
|
}
|
||||||
prevEntries := entriesByGeneration[prevGeneration]
|
prevEntries := entriesByGeneration[prevGeneration]
|
||||||
currEntries := entriesByGeneration[dbNew.generation]
|
currEntries := entriesByGeneration[dbNew.generation]
|
||||||
totalEntries := prevEntries + currEntries
|
totalEntries := prevEntries + currEntries
|
||||||
if totalEntries != metricRowsN {
|
if float64(currEntries)/float64(totalEntries) < 0.1 {
|
||||||
t.Fatalf("unexpected number of entries in tsid cache; got %d; want %d", totalEntries, metricRowsN)
|
t.Fatalf("too small share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries)
|
||||||
}
|
|
||||||
if float64(currEntries)/float64(totalEntries) > 0.1 {
|
|
||||||
t.Fatalf("too big share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1602,12 +1647,12 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||||
|
|
||||||
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
metricNameBuf = mn.Marshal(metricNameBuf[:0])
|
||||||
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
|
||||||
var tsid TSID
|
var genTSID generationTSID
|
||||||
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
|
if err := is.GetOrCreateTSIDByName(&genTSID, metricNameBuf, metricNameRawBuf, 0); err != nil {
|
||||||
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
|
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
|
||||||
}
|
}
|
||||||
mns = append(mns, mn)
|
mns = append(mns, mn)
|
||||||
tsids = append(tsids, tsid)
|
tsids = append(tsids, genTSID.TSID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the metrics to the per-day stores
|
// Add the metrics to the per-day stores
|
||||||
|
|
|
@ -58,7 +58,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
var mn MetricName
|
var mn MetricName
|
||||||
var tsid TSID
|
var genTSID generationTSID
|
||||||
|
|
||||||
// The most common tags.
|
// The most common tags.
|
||||||
mn.Tags = []Tag{
|
mn.Tags = []Tag{
|
||||||
|
@ -72,14 +72,14 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
|
||||||
|
|
||||||
startOffset := 0
|
startOffset := 0
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
benchmarkIndexDBAddTSIDs(db, &tsid, &mn, startOffset, recordsPerLoop)
|
benchmarkIndexDBAddTSIDs(db, &genTSID, &mn, startOffset, recordsPerLoop)
|
||||||
startOffset += recordsPerLoop
|
startOffset += recordsPerLoop
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
|
func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) {
|
||||||
var metricName []byte
|
var metricName []byte
|
||||||
var metricNameRaw []byte
|
var metricNameRaw []byte
|
||||||
is := db.getIndexSearch(noDeadline)
|
is := db.getIndexSearch(noDeadline)
|
||||||
|
@ -92,7 +92,7 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
|
||||||
mn.sortTags()
|
mn.sortTags()
|
||||||
metricName = mn.Marshal(metricName[:0])
|
metricName = mn.Marshal(metricName[:0])
|
||||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||||
if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil {
|
if err := is.GetOrCreateTSIDByName(genTSID, metricName, metricNameRaw, 0); err != nil {
|
||||||
panic(fmt.Errorf("cannot insert record: %w", err))
|
panic(fmt.Errorf("cannot insert record: %w", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,7 +129,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
|
||||||
mn.sortTags()
|
mn.sortTags()
|
||||||
metricName = mn.Marshal(metricName[:0])
|
metricName = mn.Marshal(metricName[:0])
|
||||||
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
|
||||||
if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
|
if err := is.createTSIDByMetricName(&tsid, metricName, metricNameRaw, 0); err != nil {
|
||||||
b.Fatalf("cannot insert record: %s", err)
|
b.Fatalf("cannot insert record: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||||
value := fmt.Sprintf("value_%d", i)
|
value := fmt.Sprintf("value_%d", i)
|
||||||
mn.AddTag(key, value)
|
mn.AddTag(key, value)
|
||||||
}
|
}
|
||||||
var tsid TSID
|
var genTSID generationTSID
|
||||||
var metricName []byte
|
var metricName []byte
|
||||||
var metricNameRaw []byte
|
var metricNameRaw []byte
|
||||||
|
|
||||||
|
@ -312,7 +312,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||||
mn.sortTags()
|
mn.sortTags()
|
||||||
metricName = mn.Marshal(metricName[:0])
|
metricName = mn.Marshal(metricName[:0])
|
||||||
metricNameRaw = mn.marshalRaw(metricName[:0])
|
metricNameRaw = mn.marshalRaw(metricName[:0])
|
||||||
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
|
if err := is.GetOrCreateTSIDByName(&genTSID, metricName, metricNameRaw, 0); err != nil {
|
||||||
b.Fatalf("cannot insert record: %s", err)
|
b.Fatalf("cannot insert record: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -321,7 +321,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
var tsidLocal TSID
|
var tsidLocal generationTSID
|
||||||
var metricNameLocal []byte
|
var metricNameLocal []byte
|
||||||
var metricNameLocalRaw []byte
|
var metricNameLocalRaw []byte
|
||||||
mnLocal := mn
|
mnLocal := mn
|
||||||
|
|
|
@ -1176,8 +1176,7 @@ func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
|
||||||
}
|
}
|
||||||
// Do not reset MetricName->TSID cache in order to prevent from adding new data points
|
// Do not reset MetricName->TSID cache, since it is already reset inside DeleteTSIDs.
|
||||||
// to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs.
|
|
||||||
|
|
||||||
// Do not reset MetricID->MetricName cache, since it must be used only
|
// Do not reset MetricID->MetricName cache, since it must be used only
|
||||||
// after filtering out deleted metricIDs.
|
// after filtering out deleted metricIDs.
|
||||||
|
@ -1531,9 +1530,9 @@ var metricRowsInsertCtxPool sync.Pool
|
||||||
|
|
||||||
const maxMetricRowsPerBlock = 8000
|
const maxMetricRowsPerBlock = 8000
|
||||||
|
|
||||||
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
|
// RegisterMetricNames registers all the metric names from mrs in the indexdb, so they can be queried later.
|
||||||
//
|
//
|
||||||
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
|
// The the MetricRow.Timestamp is used for registering the metric name at the given day according to the timestamp.
|
||||||
// Th MetricRow.Value field is ignored.
|
// Th MetricRow.Value field is ignored.
|
||||||
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error {
|
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error {
|
||||||
qt = qt.NewChild("registering %d series", len(mrs))
|
qt = qt.NewChild("registering %d series", len(mrs))
|
||||||
|
@ -1564,13 +1563,12 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) e
|
||||||
mn.sortTags()
|
mn.sortTags()
|
||||||
metricName = mn.Marshal(metricName[:0])
|
metricName = mn.Marshal(metricName[:0])
|
||||||
date := uint64(mr.Timestamp) / msecPerDay
|
date := uint64(mr.Timestamp) / msecPerDay
|
||||||
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil {
|
if err := is.GetOrCreateTSIDByName(&genTSID, metricName, mr.MetricNameRaw, date); err != nil {
|
||||||
if errors.Is(err, errSeriesCardinalityExceeded) {
|
if errors.Is(err, errSeriesCardinalityExceeded) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err)
|
return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err)
|
||||||
}
|
}
|
||||||
genTSID.generation = idb.generation
|
|
||||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -1629,6 +1627,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
r.Timestamp = mr.Timestamp
|
r.Timestamp = mr.Timestamp
|
||||||
r.Value = mr.Value
|
r.Value = mr.Value
|
||||||
r.PrecisionBits = precisionBits
|
r.PrecisionBits = precisionBits
|
||||||
|
|
||||||
|
// Search for TSID for the given mr.MetricNameRaw and store it at r.TSID.
|
||||||
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
|
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
|
||||||
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
|
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
|
||||||
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
|
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
|
||||||
|
@ -1636,7 +1636,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
|
||||||
|
// The TSID for mr.MetricNameRaw has been found in the cache.
|
||||||
|
|
||||||
if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil {
|
if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil {
|
||||||
|
// Skip r, since it exceeds cardinality limit
|
||||||
j--
|
j--
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -1649,18 +1652,21 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
prevMetricNameRaw = mr.MetricNameRaw
|
prevMetricNameRaw = mr.MetricNameRaw
|
||||||
|
|
||||||
if genTSID.generation != idb.generation {
|
if genTSID.generation != idb.generation {
|
||||||
// The found entry is from the previous cache generation,
|
// The found TSID is from the previous cache generation (e.g. from the previous indexdb),
|
||||||
// so attempt to re-populate the current generation with this entry.
|
// so attempt to create TSID indexes for the current date.
|
||||||
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
|
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
|
||||||
date := uint64(r.Timestamp) / msecPerDay
|
date := uint64(r.Timestamp) / msecPerDay
|
||||||
created, err := is.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw, date)
|
created, err := is.maybeCreateIndexes(&genTSID, mr.MetricNameRaw, date)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot create indexes: %w", err)
|
return fmt.Errorf("cannot create indexes: %w", err)
|
||||||
}
|
}
|
||||||
if created {
|
if created {
|
||||||
genTSID.generation = idb.generation
|
|
||||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||||
}
|
}
|
||||||
|
// It is OK if TSID indexes aren't created for the current date.
|
||||||
|
// This means they exist for the current date in the previous indexdb,
|
||||||
|
// and there is enough time for creating them in the current indexdb
|
||||||
|
// until the next day starts.
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -1706,9 +1712,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
}
|
}
|
||||||
slowInsertsCount++
|
slowInsertsCount++
|
||||||
date := uint64(r.Timestamp) / msecPerDay
|
date := uint64(r.Timestamp) / msecPerDay
|
||||||
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
|
if err := is.GetOrCreateTSIDByName(&genTSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
|
||||||
j--
|
j--
|
||||||
if errors.Is(err, errSeriesCardinalityExceeded) {
|
if errors.Is(err, errSeriesCardinalityExceeded) {
|
||||||
|
// Skip the row, since it exceeds the configured cardinality limit.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Do not stop adding rows on error - just skip invalid row.
|
// Do not stop adding rows on error - just skip invalid row.
|
||||||
|
@ -1719,8 +1726,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
genTSID.generation = idb.generation
|
r.TSID = genTSID.TSID
|
||||||
genTSID.TSID = r.TSID
|
|
||||||
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
|
||||||
|
|
||||||
prevTSID = r.TSID
|
prevTSID = r.TSID
|
||||||
|
@ -1897,7 +1903,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||||
prevDate = date
|
prevDate = date
|
||||||
prevMetricID = metricID
|
prevMetricID = metricID
|
||||||
if hour == hm.hour {
|
if hour == hm.hour {
|
||||||
// The r belongs to the current hour. Check for the current hour cache.
|
// The row belongs to the current hour. Check for the current hour cache.
|
||||||
if hm.m.Has(metricID) {
|
if hm.m.Has(metricID) {
|
||||||
// Fast path: the metricID is in the current hour cache.
|
// Fast path: the metricID is in the current hour cache.
|
||||||
// This means the metricID has been already added to per-day inverted index.
|
// This means the metricID has been already added to per-day inverted index.
|
||||||
|
@ -1948,7 +1954,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||||
s.pendingHourEntriesLock.Unlock()
|
s.pendingHourEntriesLock.Unlock()
|
||||||
}
|
}
|
||||||
if len(pendingDateMetricIDs) == 0 {
|
if len(pendingDateMetricIDs) == 0 {
|
||||||
// Fast path - there are no new (date, metricID) entries in rows.
|
// Fast path - there are no new (date, metricID) entries.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1964,25 +1970,20 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||||
}
|
}
|
||||||
return a.tsid.MetricID < b.tsid.MetricID
|
return a.tsid.MetricID < b.tsid.MetricID
|
||||||
})
|
})
|
||||||
|
|
||||||
idb := s.idb()
|
idb := s.idb()
|
||||||
is := idb.getIndexSearch(noDeadline)
|
is := idb.getIndexSearch(noDeadline)
|
||||||
defer idb.putIndexSearch(is)
|
defer idb.putIndexSearch(is)
|
||||||
|
|
||||||
var firstError error
|
var firstError error
|
||||||
dateMetricIDsForCache := make([]dateMetricID, 0, len(pendingDateMetricIDs))
|
dateMetricIDsForCache := make([]dateMetricID, 0, len(pendingDateMetricIDs))
|
||||||
mn := GetMetricName()
|
mn := GetMetricName()
|
||||||
for _, dmid := range pendingDateMetricIDs {
|
for _, dmid := range pendingDateMetricIDs {
|
||||||
date := dmid.date
|
date := dmid.date
|
||||||
metricID := dmid.tsid.MetricID
|
metricID := dmid.tsid.MetricID
|
||||||
ok, err := is.hasDateMetricID(date, metricID)
|
if !is.hasDateMetricID(date, metricID) {
|
||||||
if err != nil {
|
|
||||||
if firstError == nil {
|
|
||||||
firstError = fmt.Errorf("error when locating (date=%s, metricID=%d) in database: %w", dateToString(date), metricID, err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
// The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes.
|
// The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes.
|
||||||
// It is OK if the (date, metricID) entry is added multiple times to db
|
// It is OK if the (date, metricID) entry is added multiple times to indexdb
|
||||||
// by concurrent goroutines.
|
// by concurrent goroutines.
|
||||||
if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil {
|
if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil {
|
||||||
if firstError == nil {
|
if firstError == nil {
|
||||||
|
@ -1992,11 +1993,12 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||||
}
|
}
|
||||||
mn.sortTags()
|
mn.sortTags()
|
||||||
is.createPerDayIndexes(date, dmid.tsid, mn)
|
is.createPerDayIndexes(date, dmid.tsid, mn)
|
||||||
|
} else {
|
||||||
|
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
|
||||||
|
date: date,
|
||||||
|
metricID: metricID,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
|
|
||||||
date: date,
|
|
||||||
metricID: metricID,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
PutMetricName(mn)
|
PutMetricName(mn)
|
||||||
// The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB.
|
// The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB.
|
||||||
|
|
Loading…
Reference in a new issue