lib/storage: revert the migration from global to per-day index for (MetricName -> TSID)

This reverts the following commits:
- e0e16a2d36
- 2ce02a7fe6

The reason for revert: the updated logic breaks assumptions made
when fixing https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698 .
For example, if a time series stop receiving new samples during the first
day after the indexdb rotation, there are chances that the time series
won't be registered in the new indexdb. This is OK until the next indexdb
rotation, since the time series is registered in the previous indexdb,
so it can be found during queries. But the time series will become invisible
for search after the next indexdb rotation, while its data is still there.

There is also incompletely solved issue with the increased CPU and disk IO resource
usage just after the indexdb rotation. There was an attempt to fix it, but it didn't fix
it in full, while introducing the issue mentioned above. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401

TODO: to find out the solution, which simultaneously solves the following issues:
- increased memory usage for setups high churn rate and long retention (e.g. what the reverted commit does)
- increased CPU and disk IO usage during indexdb rotation ( https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 )
- https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698

Possible solution - to create the new indexdb in one hour before the indexdb rotation
and to gradually pre-populate it with the needed index data during the last hour before indexdb rotation.
Then the new indexdb will contain all the needed data just after the rotation,
so it won't trigger increased CPU and disk IO.
This commit is contained in:
Aliaksandr Valialkin 2023-05-18 11:11:42 -07:00
parent 4f7f750850
commit 1f28b46ae9
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 187 additions and 286 deletions

View file

@ -27,7 +27,6 @@ The following tip changes can be tested by building VictoriaMetrics components f
* SECURITY: upgrade Go builder from Go1.20.3 to Go1.20.4. See [the list of issues addressed in Go1.20.4](https://github.com/golang/go/issues?q=milestone%3AGo1.20.4+label%3ACherryPickApproved).
* SECURITY: serve `/robots.txt` content to disallow indexing of the exposed instances by search engines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4128) for details.
* FETURE: reduce memory usage by 2x and more for setups with big [retention](https://docs.victoriametrics.com/#retention) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4252).
* FEATURE: update [docker compose environment](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics) to V2 in respect to V1 deprecation notice from June 2023. See [Migrate to Compose V2](https://docs.docker.com/compose/migrate/).
* FEATURE: deprecate `-bigMergeConcurrency` command-line flag, since improper configuration for this flag frequently led to uncontrolled growth of unmerged parts, which, in turn, could lead to queries slowdown and increased CPU usage. The concurrency for [background merges](https://docs.victoriametrics.com/#storage) can be controlled via `-smallMergeConcurrency` command-line flag, though it isn't recommended to change this flag in general case.
* FEATURE: do not execute the incoming request if it has been canceled by the client before the execution start. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4223).

View file

@ -31,17 +31,7 @@ import (
const (
// Prefix for MetricName->TSID entries.
//
// This index was substituted with nsPrefixDateMetricNameToTSID,
// since the MetricName->TSID index may require big amounts of memory for indexdb/dataBlocks cache
// when it grows big on the configured retention under high churn rate
// (e.g. when new time series are constantly registered).
//
// It is much more efficient from memory usage PoV to query per-day MetricName->TSID index
// (aka nsPrefixDateMetricNameToTSID) when the TSID must be obtained for the given MetricName
// during data ingestion under high churn rate and big retention.
//
// nsPrefixMetricNameToTSID = 0
nsPrefixMetricNameToTSID = 0
// Prefix for Tag->MetricID entries.
nsPrefixTagToMetricIDs = 1
@ -60,9 +50,6 @@ const (
// Prefix for (Date,Tag)->MetricID entries.
nsPrefixDateTagToMetricIDs = 6
// Prefix for (Date,MetricName)->TSID entries.
nsPrefixDateMetricNameToTSID = 7
)
// indexDB represents an index db.
@ -390,79 +377,33 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
db.s.metricNameCache.Set(key[:], metricName)
}
// maybeCreateIndexes probabilistically creates global and per-day indexes for already existing tsid.
// maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db.
//
// The tsid is obtained from the previous indexdb, new indexes are created in the current indexdb.
// The probability increases from 0 to 100% during the first hour since db rotation.
//
// The probability increases from 0 to 100% until the last hour of the indexdb rotation day.
//
// True is returned if new index entries were created, and false otherwise.
func (is *indexSearch) maybeCreateIndexes(genTSID *generationTSID, metricNameRaw []byte, date uint64) (bool, error) {
tsid := &genTSID.TSID
currentTimestamp := fasttime.UnixTimestamp()
if canPostponeIndexCreation(currentTimestamp, is.db.rotationTimestamp, date, tsid.MetricID) {
// Fast path - postpone index creation
return false, nil
// It returns true if new index entry was created, and false if it was skipped.
func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) {
pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600
if pMin < 1 {
p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
if p > pMin {
// Fast path: there is no need creating indexes for metricNameRaw yet.
return false, nil
}
}
// Slow path: create indexes for (tsid, metricNameRaw, date).
// Slow path: create indexes for (tsid, metricNameRaw) at db.
mn := GetMetricName()
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
}
mn.sortTags()
is.createGlobalIndexes(tsid, mn)
is.createPerDayIndexes(date, tsid, mn)
is.createPerDayIndexes(date, tsid.MetricID, mn)
PutMetricName(mn)
genTSID.generation = is.db.generation
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
return true, nil
}
func canPostponeIndexCreation(currentTimestamp, rotationTimestamp, date, metricID uint64) bool {
if rotationTimestamp/(24*3600) != date {
// The date doesn't match the day of the indexdb rotation.
// This means that the previoud indexdb doesn't contain the needed indexes
// for the given date, so index must be created in the current indexdb
// in order to be able to find time series for the given date.
return false
}
// Calculate the time window when the index creation can be postponed.
// It starts just after the last indexdb rotation and ends one hour before the next day
// after the indexdb rotation.
d := rotationTimestamp % (24 * 3600)
if d < 3600 {
// The indexdb rotation occurred in less than an hour after the given date started.
// This means that the previous indexdb may miss entries for the given date.
// So index for the given date must be created in the current indexdb
// in order to be able to find time series for the given date.
return false
}
d = 24*3600 - d
if d < 3600 {
// The indexdb rotation occurred in less than an hour before the next date.
// Index for the given date must be created in the current indexdb
// in order to guarantee that it is created until the next date starts.
return false
}
d -= 3600
timeSinceRotation := currentTimestamp - rotationTimestamp
if currentTimestamp < rotationTimestamp || timeSinceRotation > d {
// The time window for postponing index creation is over.
return false
}
// Calculate the probability of index creation for the given metricID
pMin := uint64((float64(timeSinceRotation) / float64(d)) * (1 << 64))
p := fastHashUint64(metricID)
// If p is smaller than pMin, means the time window for creating the index is over.
// Otherwise we still have some time for postponing index creation
return p >= pMin
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
prefix := ^uint64(0)
if versioned {
@ -532,28 +473,25 @@ func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
return dst, nil
}
// getTSIDByMetricName fills the dst with TSID for the given metricName at the given date.
// getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
//
// It returns false if the given metricName isn't found in the indexdb.
func (is *indexSearch) getTSIDByMetricName(dst *generationTSID, metricName []byte, date uint64) bool {
if is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date) {
// Fast path - the TSID is found in the current indexdb.
dst.generation = is.db.generation
return true
// It returns io.EOF if the given mn isn't found locally.
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
is := db.getIndexSearch(noDeadline)
err := is.getTSIDByMetricName(dst, metricName)
db.putIndexSearch(is)
if err == nil {
return nil
}
if err != io.EOF {
return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)
}
// Slow path - search for the TSID in the previous indexdb
ok := false
deadline := is.deadline
is.db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
ok = is.getTSIDByMetricNameNoExtDB(&dst.TSID, metricName, date)
extDB.putIndexSearch(is)
if ok {
dst.generation = extDB.generation
}
})
return ok
// Do not search for the TSID in the external storage,
// since this function is already called by another indexDB instance.
// The TSID for the given mn wasn't found.
return io.EOF
}
type indexSearch struct {
@ -571,23 +509,31 @@ type indexSearch struct {
tsidByNameSkips int
}
// GetOrCreateTSIDByName fills the dst with TSID for the given (date, metricName)
// GetOrCreateTSIDByName fills the dst with TSID for the given metricName.
//
// The function creates new TSID and registers the metricName in global and per-day indexes
// if TSID for the given (date, metricName) isn't found.
func (is *indexSearch) GetOrCreateTSIDByName(dst *generationTSID, metricName, metricNameRaw []byte, date uint64) error {
// It also registers the metricName in global and per-day indexes
// for the given date if the metricName->TSID entry is missing in the index.
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
// A hack: skip searching for the TSID after many serial misses.
// This should improve insertion performance for big batches
// of new time series.
if is.tsidByNameMisses < 100 {
if is.getTSIDByMetricName(dst, metricName, date) {
err := is.getTSIDByMetricName(dst, metricName)
if err == nil {
// Fast path - the TSID for the given metricName has been found in the index.
is.tsidByNameMisses = 0
if err := is.db.s.registerSeriesCardinality(dst.TSID.MetricID, metricNameRaw); err != nil {
if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
// There is no need in checking whether the TSID is present in the per-day index for the given date,
// since this check must be performed by the caller in an optimized way.
// See storage.updatePerDateData() function.
return nil
}
if err != io.EOF {
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
return fmt.Errorf("cannot search TSID by MetricName %s: %w", userReadableMetricName, err)
}
is.tsidByNameMisses++
} else {
is.tsidByNameSkips++
@ -597,14 +543,13 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *generationTSID, metricName, me
}
}
// TSID for the given (date, metricName) wasn't found. Create it.
// It is OK if duplicate TSID for (date, metricName) is created by concurrent goroutines.
// Metric results will be merged by metricName at TableSearch.
if err := is.createTSIDByMetricName(&dst.TSID, metricName, metricNameRaw, date); err != nil {
// TSID for the given name wasn't found. Create it.
// It is OK if duplicate TSID for mn is created by concurrent goroutines.
// Metric results will be merged by mn after TableSearch.
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
}
dst.generation = is.db.generation
return nil
}
@ -633,27 +578,32 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
db.indexSearchPool.Put(is)
}
// createTSIDByMetricName creates new TSID for the given (date, metricName)
func (is *indexSearch) createTSIDByMetricName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
mn := GetMetricName()
defer PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
}
generateTSID(dst, mn)
created, err := is.db.getOrCreateTSID(dst, metricName, mn)
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
is.createGlobalIndexes(dst, mn)
is.createPerDayIndexes(date, dst, mn)
is.createPerDayIndexes(date, dst.MetricID, mn)
// There is no need in invalidating tag cache, since it is invalidated
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable.
atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
if created {
// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
if logNewSeries {
logger.Infof("new series created: %s", mn.String())
}
}
return nil
}
@ -667,6 +617,30 @@ func SetLogNewSeries(ok bool) {
var logNewSeries = false
// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found.
//
// Returns true if TSID was created or false if TSID was in extDB
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {
// Search the TSID in the external storage.
// This is usually the db from the previous period.
var err error
if db.doExtDB(func(extDB *indexDB) {
err = extDB.getTSIDByNameNoCreate(dst, metricName)
}) {
if err == nil {
// The TSID has been found in the external storage.
return false, nil
}
if err != io.EOF {
return false, fmt.Errorf("external search failed: %w", err)
}
}
// The TSID wasn't found in the external storage.
// Generate it locally.
generateTSID(dst, mn)
return true, nil
}
func generateTSID(dst *TSID, mn *MetricName) {
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
// Assume that the job-like metric is put at mn.Tags[0], while instance-like metric is put at mn.Tags[1]
@ -690,6 +664,13 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
ii := getIndexItems()
defer putIndexItems(ii)
// Create MetricName -> TSID index.
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create MetricID -> MetricName index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
@ -1954,27 +1935,26 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
func (is *indexSearch) getTSIDByMetricNameNoExtDB(dst *TSID, metricName []byte, date uint64) bool {
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
dmis := is.db.s.getDeletedMetricIDs()
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateMetricNameToTSID)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID)
kb.B = append(kb.B, metricName...)
kb.B = append(kb.B, kvSeparatorChar)
ts.Seek(kb.B)
for ts.NextItem() {
if !bytes.HasPrefix(ts.Item, kb.B) {
// Nothing found.
return false
return io.EOF
}
v := ts.Item[len(kb.B):]
tail, err := dst.Unmarshal(v)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal TSID: %s", err)
return fmt.Errorf("cannot unmarshal TSID: %w", err)
}
if len(tail) > 0 {
logger.Panicf("FATAL: unexpected non-empty tail left after unmarshaling TSID: %X", tail)
return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
}
if dmis.Len() > 0 {
// Verify whether the dst is marked as deleted.
@ -1984,13 +1964,13 @@ func (is *indexSearch) getTSIDByMetricNameNoExtDB(dst *TSID, metricName []byte,
}
}
// Found valid dst.
return true
return nil
}
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when searching TSID by metricName; searchPrefix %q: %s", kb.B, err)
return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err)
}
// Nothing found
return false
return io.EOF
}
func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
@ -2841,21 +2821,13 @@ const (
int64Max = int64((1 << 63) - 1)
)
func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricName) {
func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) {
ii := getIndexItems()
defer putIndexItems(ii)
ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID)
ii.B = encoding.MarshalUint64(ii.B, date)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.Next()
// Create per-day inverted index entries for TSID.
ii.B = marshalCommonPrefix(ii.B, nsPrefixDateMetricNameToTSID)
ii.B = encoding.MarshalUint64(ii.B, date)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.B = encoding.MarshalUint64(ii.B, metricID)
ii.Next()
// Create per-day inverted index entries for metricID.
@ -2863,12 +2835,9 @@ func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricNa
defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
ii.registerTagIndexes(kb.B, mn, tsid.MetricID)
ii.registerTagIndexes(kb.B, mn, metricID)
is.db.tb.AddItems(ii.Items)
// Register the (date, metricID) entry in the cache,
// so next time the entry is found there instead of searching for it in the indexdb.
is.db.s.dateMetricIDCache.Set(date, tsid.MetricID)
is.db.s.dateMetricIDCache.Set(date, metricID)
}
func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {
@ -2978,41 +2947,22 @@ func reverseBytes(dst, src []byte) []byte {
return dst
}
func (is *indexSearch) hasDateMetricID(date, metricID uint64) bool {
if is.hasDateMetricIDNoExtDB(date, metricID) {
// Fast path - the (date, metricID) entry is found in the current indexdb.
return true
}
// Slow path - search for the (date, metricID) in the previous indexdb.
ok := false
deadline := is.deadline
is.db.doExtDB(func(extDB *indexDB) {
is := extDB.getIndexSearch(deadline)
ok = is.hasDateMetricIDNoExtDB(date, metricID)
extDB.putIndexSearch(is)
})
return ok
}
func (is *indexSearch) hasDateMetricIDNoExtDB(date, metricID uint64) bool {
func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = encoding.MarshalUint64(kb.B, metricID)
err := ts.FirstItemWithPrefix(kb.B)
if err == nil {
if string(ts.Item) != string(kb.B) {
logger.Panicf("FATAL: unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
if err := ts.FirstItemWithPrefix(kb.B); err != nil {
if err == io.EOF {
return false, nil
}
// Fast path - the (date, metricID) entry is found in the current indexdb.
return true
return false, fmt.Errorf("error when searching for (date=%s, metricID=%d) entry: %w", dateToString(date), metricID, err)
}
if err != io.EOF {
logger.Panicf("FATAL: unexpected error when searching for (date=%s, metricID=%d) entry: %s", dateToString(date), metricID, err)
if string(ts.Item) != string(kb.B) {
return false, fmt.Errorf("unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
}
return false
return true, nil
}
func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,

View file

@ -22,49 +22,6 @@ import (
"github.com/VictoriaMetrics/fastcache"
)
func TestCanPostponeIndexCreation(t *testing.T) {
f := func(currentTime, rotationTimestamp, date, metricID uint64, resultExpected bool) {
t.Helper()
result := canPostponeIndexCreation(currentTime, rotationTimestamp, date, metricID)
if result != resultExpected {
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
}
}
// the date doesn't match rotationTimestamp
f(0, 0, 1, 0, false)
// the rotationTimestamp is less than an hour to the day start
f(0, 0, 0, 0, false)
// the rotationTimestamp is less than an hour before the next day start
f(23*3600+10, 23*3600+1, 0, 0, false)
// the time for index creation is over
f(0, 10*3600, 0, 0, false)
f(23*3600+1, 10*3600, 0, 0, false)
f(24*3600, 10*3600, 0, 0, false)
f(1234567890, 0, 0, 0, false)
// the time for index creation for the given metricID is over
f(23*3600-1, 4*3600, 0, 1, false)
f(22*3600, 4*3600, 0, 1, false)
f(10*3600, 4*3600, 0, 1, false)
// it is ok to create index for the given metricID
f(4*3600, 4*3600, 0, 1, true)
f(7*3600, 4*3600, 0, 1, true)
f(9*3600, 4*3600, 0, 1, true)
// different metricID
f(10*3600, 4*3600, 0, 12345, true)
f(14*3600, 4*3600, 0, 12345, true)
f(15*3600, 4*3600, 0, 12345, true)
f(16*3600, 4*3600, 0, 12345, false)
f(17*3600, 4*3600, 0, 12345, false)
f(20*3600, 4*3600, 0, 12345, false)
}
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
f := func(metricIDs []uint64) {
t.Helper()
@ -635,8 +592,6 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
date := uint64(timestampFromTime(time.Now())) / msecPerDay
var metricNameBuf []byte
var metricNameRawBuf []byte
for i := 0; i < 4e2+1; i++ {
@ -657,19 +612,20 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
// Create tsid for the metricName.
var genTSID generationTSID
if err := is.GetOrCreateTSIDByName(&genTSID, metricNameBuf, metricNameRawBuf, date); err != nil {
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
}
mns = append(mns, mn)
tsids = append(tsids, genTSID.TSID)
tsids = append(tsids, tsid)
}
// fill Date -> MetricID cache
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
is.createPerDayIndexes(date, tsid, &mns[i])
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
}
// Flush index to disk, so it becomes visible for search
@ -688,9 +644,8 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return false
}
currentTime := timestampFromTime(time.Now())
timeseriesCounters := make(map[uint64]bool)
var genTSID generationTSID
var tsidCopy TSID
var metricNameCopy []byte
allLabelNames := make(map[string]bool)
for i := range mns {
@ -703,29 +658,26 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
mn.sortTags()
metricName := mn.Marshal(nil)
is := db.getIndexSearch(noDeadline)
if !is.getTSIDByMetricName(&genTSID, metricName, uint64(currentTime)/msecPerDay) {
return fmt.Errorf("cannot obtain tsid #%d for mn %s", i, mn)
if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName); err != nil {
return fmt.Errorf("cannot obtain tsid #%d for mn %s: %w", i, mn, err)
}
db.putIndexSearch(is)
if isConcurrent {
// Copy tsid.MetricID, since multiple TSIDs may match
// the same mn in concurrent mode.
genTSID.TSID.MetricID = tsid.MetricID
tsidCopy.MetricID = tsid.MetricID
}
if !reflect.DeepEqual(tsid, &genTSID.TSID) {
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &genTSID.TSID, tsid)
if !reflect.DeepEqual(tsid, &tsidCopy) {
return fmt.Errorf("unexpected tsid for mn:\n%s\ngot\n%+v\nwant\n%+v", mn, &tsidCopy, tsid)
}
// Search for metric name for the given metricID.
var err error
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], genTSID.TSID.MetricID)
metricNameCopy, err = db.searchMetricNameWithCache(metricNameCopy[:0], tsidCopy.MetricID)
if err != nil {
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", genTSID.TSID.MetricID, i, err)
return fmt.Errorf("error in searchMetricNameWithCache for metricID=%d; i=%d: %w", tsidCopy.MetricID, i, err)
}
if !bytes.Equal(metricName, metricNameCopy) {
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", genTSID.TSID.MetricID, metricNameCopy, metricName)
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName)
}
// Try searching metric name for non-existent MetricID.
@ -786,6 +738,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Try tag filters.
currentTime := timestampFromTime(time.Now())
tr := TimeRange{
MinTimestamp: currentTime - msecPerDay,
MaxTimestamp: currentTime + msecPerDay,
@ -1507,11 +1460,9 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
}
const metricRowsN = 1000
currentDayTimestamp := (time.Now().UnixMilli() / msecPerDay) * msecPerDay
timeMin := currentDayTimestamp - 24*3600*1000
timeMax := currentDayTimestamp + 24*3600*1000
mrs := testGenerateMetricRows(r, metricRowsN, timeMin, timeMax)
// use min-max timestamps of 1month range to create smaller number of partitions
timeMin, timeMax := time.Now().Add(-730*time.Hour), time.Now()
mrs := testGenerateMetricRows(r, metricRowsN, timeMin.UnixMilli(), timeMax.UnixMilli())
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
@ -1576,14 +1527,17 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
s.getTSIDFromCache(&genTSID, mr.MetricNameRaw)
entriesByGeneration[genTSID.generation]++
}
if len(entriesByGeneration) != 2 {
if len(entriesByGeneration) > 2 {
t.Fatalf("expecting two generations; got %d", entriesByGeneration)
}
prevEntries := entriesByGeneration[prevGeneration]
currEntries := entriesByGeneration[dbNew.generation]
totalEntries := prevEntries + currEntries
if float64(currEntries)/float64(totalEntries) < 0.1 {
t.Fatalf("too small share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries)
if totalEntries != metricRowsN {
t.Fatalf("unexpected number of entries in tsid cache; got %d; want %d", totalEntries, metricRowsN)
}
if float64(currEntries)/float64(totalEntries) > 0.1 {
t.Fatalf("too big share of entries in the new generation; currEntries=%d, prevEntries=%d", currEntries, prevEntries)
}
}
@ -1647,12 +1601,12 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
var genTSID generationTSID
if err := is.GetOrCreateTSIDByName(&genTSID, metricNameBuf, metricNameRawBuf, 0); err != nil {
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
}
mns = append(mns, mn)
tsids = append(tsids, genTSID.TSID)
tsids = append(tsids, tsid)
}
// Add the metrics to the per-day stores
@ -1661,7 +1615,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
for i := range tsids {
tsid := &tsids[i]
metricIDs.Add(tsid.MetricID)
is.createPerDayIndexes(date, tsid, &mns[i])
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
}
allMetricIDs.Union(&metricIDs)
perDayMetricIDs[date] = &metricIDs

View file

@ -58,7 +58,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var mn MetricName
var genTSID generationTSID
var tsid TSID
// The most common tags.
mn.Tags = []Tag{
@ -72,14 +72,14 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
startOffset := 0
for pb.Next() {
benchmarkIndexDBAddTSIDs(db, &genTSID, &mn, startOffset, recordsPerLoop)
benchmarkIndexDBAddTSIDs(db, &tsid, &mn, startOffset, recordsPerLoop)
startOffset += recordsPerLoop
}
})
b.StopTimer()
}
func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) {
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
var metricName []byte
var metricNameRaw []byte
is := db.getIndexSearch(noDeadline)
@ -92,7 +92,7 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricNa
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.GetOrCreateTSIDByName(genTSID, metricName, metricNameRaw, 0); err != nil {
if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil {
panic(fmt.Errorf("cannot insert record: %w", err))
}
}
@ -129,7 +129,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.createTSIDByMetricName(&tsid, metricName, metricNameRaw, 0); err != nil {
if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
@ -302,7 +302,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
value := fmt.Sprintf("value_%d", i)
mn.AddTag(key, value)
}
var genTSID generationTSID
var tsid TSID
var metricName []byte
var metricNameRaw []byte
@ -312,7 +312,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
metricNameRaw = mn.marshalRaw(metricName[:0])
if err := is.GetOrCreateTSIDByName(&genTSID, metricName, metricNameRaw, 0); err != nil {
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
@ -321,7 +321,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var tsidLocal generationTSID
var tsidLocal TSID
var metricNameLocal []byte
var metricNameLocalRaw []byte
mnLocal := mn

View file

@ -1176,7 +1176,8 @@ func (s *Storage) DeleteSeries(qt *querytracer.Tracer, tfss []*TagFilters) (int,
if err != nil {
return deletedCount, fmt.Errorf("cannot delete tsids: %w", err)
}
// Do not reset MetricName->TSID cache, since it is already reset inside DeleteTSIDs.
// Do not reset MetricName->TSID cache in order to prevent from adding new data points
// to deleted time series in Storage.add, since it is already reset inside DeleteTSIDs.
// Do not reset MetricID->MetricName cache, since it must be used only
// after filtering out deleted metricIDs.
@ -1530,9 +1531,9 @@ var metricRowsInsertCtxPool sync.Pool
const maxMetricRowsPerBlock = 8000
// RegisterMetricNames registers all the metric names from mrs in the indexdb, so they can be queried later.
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
//
// The the MetricRow.Timestamp is used for registering the metric name at the given day according to the timestamp.
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.
// Th MetricRow.Value field is ignored.
func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) error {
qt = qt.NewChild("registering %d series", len(mrs))
@ -1563,12 +1564,13 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) e
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
date := uint64(mr.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&genTSID, metricName, mr.MetricNameRaw, date); err != nil {
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil {
if errors.Is(err, errSeriesCardinalityExceeded) {
continue
}
return fmt.Errorf("cannot create TSID for metricName %q: %w", metricName, err)
}
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
return nil
@ -1627,8 +1629,6 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
// Search for TSID for the given mr.MetricNameRaw and store it at r.TSID.
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
@ -1636,10 +1636,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
// The TSID for mr.MetricNameRaw has been found in the cache.
if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil {
// Skip r, since it exceeds cardinality limit
j--
continue
}
@ -1652,21 +1649,18 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
prevMetricNameRaw = mr.MetricNameRaw
if genTSID.generation != idb.generation {
// The found TSID is from the previous cache generation (e.g. from the previous indexdb),
// so attempt to create TSID indexes for the current date.
// The found entry is from the previous cache generation,
// so attempt to re-populate the current generation with this entry.
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
date := uint64(r.Timestamp) / msecPerDay
created, err := is.maybeCreateIndexes(&genTSID, mr.MetricNameRaw, date)
created, err := is.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw, date)
if err != nil {
return fmt.Errorf("cannot create indexes: %w", err)
}
if created {
genTSID.generation = idb.generation
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
}
// It is OK if TSID indexes aren't created for the current date.
// This means they exist for the current date in the previous indexdb,
// and there is enough time for creating them in the current indexdb
// until the next day starts.
}
continue
}
@ -1712,10 +1706,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
slowInsertsCount++
date := uint64(r.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&genTSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
j--
if errors.Is(err, errSeriesCardinalityExceeded) {
// Skip the row, since it exceeds the configured cardinality limit.
continue
}
// Do not stop adding rows on error - just skip invalid row.
@ -1726,7 +1719,8 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
continue
}
r.TSID = genTSID.TSID
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
prevTSID = r.TSID
@ -1881,9 +1875,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
// pMin linearly increases from 0 to 1 during the last hour of the day.
pMin := (float64(ts%(3600*24)) / 3600) - 23
type pendingDateMetricID struct {
date uint64
tsid *TSID
mr *MetricRow
date uint64
metricID uint64
mr *MetricRow
}
var pendingDateMetricIDs []pendingDateMetricID
var pendingNextDayMetricIDs []uint64
@ -1903,7 +1897,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
prevDate = date
prevMetricID = metricID
if hour == hm.hour {
// The row belongs to the current hour. Check for the current hour cache.
// The r belongs to the current hour. Check for the current hour cache.
if hm.m.Has(metricID) {
// Fast path: the metricID is in the current hour cache.
// This means the metricID has been already added to per-day inverted index.
@ -1916,9 +1910,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
if p < pMin && !nextDayMetricIDs.Has(metricID) {
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date + 1,
tsid: &r.TSID,
mr: mrs[i],
date: date + 1,
metricID: metricID,
mr: mrs[i],
})
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
}
@ -1938,9 +1932,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
// Slow path: store the (date, metricID) entry in the indexDB.
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
tsid: &r.TSID,
mr: mrs[i],
date: date,
metricID: metricID,
mr: mrs[i],
})
}
if len(pendingNextDayMetricIDs) > 0 {
@ -1954,7 +1948,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
s.pendingHourEntriesLock.Unlock()
}
if len(pendingDateMetricIDs) == 0 {
// Fast path - there are no new (date, metricID) entries.
// Fast path - there are no new (date, metricID) entries in rows.
return nil
}
@ -1968,22 +1962,27 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
if a.date != b.date {
return a.date < b.date
}
return a.tsid.MetricID < b.tsid.MetricID
return a.metricID < b.metricID
})
idb := s.idb()
is := idb.getIndexSearch(noDeadline)
defer idb.putIndexSearch(is)
var firstError error
dateMetricIDsForCache := make([]dateMetricID, 0, len(pendingDateMetricIDs))
mn := GetMetricName()
for _, dmid := range pendingDateMetricIDs {
date := dmid.date
metricID := dmid.tsid.MetricID
if !is.hasDateMetricID(date, metricID) {
metricID := dmid.metricID
ok, err := is.hasDateMetricID(date, metricID)
if err != nil {
if firstError == nil {
firstError = fmt.Errorf("error when locating (date=%s, metricID=%d) in database: %w", dateToString(date), metricID, err)
}
continue
}
if !ok {
// The (date, metricID) entry is missing in the indexDB. Add it there together with per-day indexes.
// It is OK if the (date, metricID) entry is added multiple times to indexdb
// It is OK if the (date, metricID) entry is added multiple times to db
// by concurrent goroutines.
if err := mn.UnmarshalRaw(dmid.mr.MetricNameRaw); err != nil {
if firstError == nil {
@ -1992,13 +1991,12 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
continue
}
mn.sortTags()
is.createPerDayIndexes(date, dmid.tsid, mn)
} else {
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
date: date,
metricID: metricID,
})
is.createPerDayIndexes(date, metricID, mn)
}
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
date: date,
metricID: metricID,
})
}
PutMetricName(mn)
// The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB.