lib/storage: introduce per-day MetricName=>TSID index (#4252)

The new index substitutes global MetricName=>TSID index
used for locating TSIDs on ingestion path.
For installations with high ingestion and churn rate, global
MetricName=>TSID index can grow enormously making
index lookups too expensive. This also results into bigger
than expected cache growth for indexdb blocks.

New per-day index supposed to be much smaller and more efficient.
This should improve ingestion speed and reliability during
re-routings in cluster.

The negative outcome could be occupied disk size, since
per-day index is more expensive comparing to global index.

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2023-05-17 00:46:42 +02:00 committed by GitHub
parent 39ba4fc1c4
commit 2ce02a7fe6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 40 deletions

View file

@ -31,7 +31,11 @@ import (
const (
// Prefix for MetricName->TSID entries.
nsPrefixMetricNameToTSID = 0
// This index was substituted with nsPrefixDateMetricNameToTSID
// for resource efficiency sake. The intention is to speed up
// index lookups on busy installations. Looking up in per-day
// index supposed to be much more efficient comparing to global index.
// nsPrefixMetricNameToTSID = 0
// Prefix for Tag->MetricID entries.
nsPrefixTagToMetricIDs = 1
@ -50,6 +54,9 @@ const (
// Prefix for (Date,Tag)->MetricID entries.
nsPrefixDateTagToMetricIDs = 6
// Prefix for (Date,MetricName)->TSID entries.
nsPrefixDateMetricNameToTSID = 7
)
// indexDB represents an index db.
@ -398,7 +405,7 @@ func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date
}
mn.sortTags()
is.createGlobalIndexes(tsid, mn)
is.createPerDayIndexes(date, tsid.MetricID, mn)
is.createPerDayIndexes(date, tsid, mn)
PutMetricName(mn)
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
return true, nil
@ -476,9 +483,9 @@ func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
// getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
//
// It returns io.EOF if the given mn isn't found locally.
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte, date uint64) error {
is := db.getIndexSearch(noDeadline)
err := is.getTSIDByMetricName(dst, metricName)
err := is.getTSIDByMetricName(dst, metricName, date)
db.putIndexSearch(is)
if err == nil {
return nil
@ -518,7 +525,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRa
// This should improve insertion performance for big batches
// of new time series.
if is.tsidByNameMisses < 100 {
err := is.getTSIDByMetricName(dst, metricName)
err := is.getTSIDByMetricName(dst, metricName, date)
if err == nil {
// Fast path - the TSID for the given metricName has been found in the index.
is.tsidByNameMisses = 0
@ -585,7 +592,7 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b
return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)
}
created, err := is.db.getOrCreateTSID(dst, metricName, mn)
created, err := is.db.getOrCreateTSID(dst, metricName, mn, date)
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
@ -593,7 +600,7 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b
return err
}
is.createGlobalIndexes(dst, mn)
is.createPerDayIndexes(date, dst.MetricID, mn)
is.createPerDayIndexes(date, dst, mn)
// There is no need in invalidating tag cache, since it is invalidated
// on db.tb flush via invalidateTagFiltersCache flushCallback passed to mergeset.MustOpenTable.
@ -620,12 +627,12 @@ var logNewSeries = false
// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found.
//
// Returns true if TSID was created or false if TSID was in extDB
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName, date uint64) (bool, error) {
// Search the TSID in the external storage.
// This is usually the db from the previous period.
var err error
if db.doExtDB(func(extDB *indexDB) {
err = extDB.getTSIDByNameNoCreate(dst, metricName)
err = extDB.getTSIDByNameNoCreate(dst, metricName, date)
}) {
if err == nil {
// The TSID has been found in the external storage.
@ -664,13 +671,6 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
ii := getIndexItems()
defer putIndexItems(ii)
// Create MetricName -> TSID index.
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create MetricID -> MetricName index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
@ -1935,11 +1935,12 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte, date uint64) error {
dmis := is.db.s.getDeletedMetricIDs()
ts := &is.ts
kb := &is.kb
kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID)
kb.B = append(kb.B[:0], nsPrefixDateMetricNameToTSID)
kb.B = encoding.MarshalUint64(kb.B, date)
kb.B = append(kb.B, metricName...)
kb.B = append(kb.B, kvSeparatorChar)
ts.Seek(kb.B)
@ -2821,13 +2822,21 @@ const (
int64Max = int64((1 << 63) - 1)
)
func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) {
func (is *indexSearch) createPerDayIndexes(date uint64, tsid *TSID, mn *MetricName) {
ii := getIndexItems()
defer putIndexItems(ii)
ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID)
ii.B = encoding.MarshalUint64(ii.B, date)
ii.B = encoding.MarshalUint64(ii.B, metricID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.Next()
// Create per-day inverted index entries for TSID.
ii.B = marshalCommonPrefix(ii.B, nsPrefixDateMetricNameToTSID)
ii.B = encoding.MarshalUint64(ii.B, date)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create per-day inverted index entries for metricID.
@ -2835,9 +2844,9 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName
defer kbPool.Put(kb)
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
ii.registerTagIndexes(kb.B, mn, metricID)
ii.registerTagIndexes(kb.B, mn, tsid.MetricID)
is.db.tb.AddItems(ii.Items)
is.db.s.dateMetricIDCache.Set(date, metricID)
is.db.s.dateMetricIDCache.Set(date, tsid.MetricID)
}
func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {

View file

@ -592,6 +592,8 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
date := uint64(timestampFromTime(time.Now())) / msecPerDay
var metricNameBuf []byte
var metricNameRawBuf []byte
for i := 0; i < 4e2+1; i++ {
@ -613,7 +615,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
// Create tsid for the metricName.
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, date); err != nil {
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
}
@ -622,10 +624,9 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
}
// fill Date -> MetricID cache
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
is.createPerDayIndexes(date, tsid, &mns[i])
}
// Flush index to disk, so it becomes visible for search
@ -644,6 +645,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return false
}
currentTime := timestampFromTime(time.Now())
timeseriesCounters := make(map[uint64]bool)
var tsidCopy TSID
var metricNameCopy []byte
@ -658,7 +660,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
mn.sortTags()
metricName := mn.Marshal(nil)
if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName); err != nil {
if err := db.getTSIDByNameNoCreate(&tsidCopy, metricName, uint64(currentTime)/msecPerDay); err != nil {
return fmt.Errorf("cannot obtain tsid #%d for mn %s: %w", i, mn, err)
}
if isConcurrent {
@ -738,7 +740,6 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Try tag filters.
currentTime := timestampFromTime(time.Now())
tr := TimeRange{
MinTimestamp: currentTime - msecPerDay,
MaxTimestamp: currentTime + msecPerDay,
@ -1615,7 +1616,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
for i := range tsids {
tsid := &tsids[i]
metricIDs.Add(tsid.MetricID)
is.createPerDayIndexes(date, tsid.MetricID, &mns[i])
is.createPerDayIndexes(date, tsid, &mns[i])
}
allMetricIDs.Union(&metricIDs)
perDayMetricIDs[date] = &metricIDs

View file

@ -1875,9 +1875,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
// pMin linearly increases from 0 to 1 during the last hour of the day.
pMin := (float64(ts%(3600*24)) / 3600) - 23
type pendingDateMetricID struct {
date uint64
metricID uint64
mr *MetricRow
date uint64
tsid *TSID
mr *MetricRow
}
var pendingDateMetricIDs []pendingDateMetricID
var pendingNextDayMetricIDs []uint64
@ -1910,9 +1910,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
p := float64(uint32(fastHashUint64(metricID))) / (1 << 32)
if p < pMin && !nextDayMetricIDs.Has(metricID) {
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date + 1,
metricID: metricID,
mr: mrs[i],
date: date + 1,
tsid: &r.TSID,
mr: mrs[i],
})
pendingNextDayMetricIDs = append(pendingNextDayMetricIDs, metricID)
}
@ -1932,9 +1932,9 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
// Slow path: store the (date, metricID) entry in the indexDB.
pendingDateMetricIDs = append(pendingDateMetricIDs, pendingDateMetricID{
date: date,
metricID: metricID,
mr: mrs[i],
date: date,
tsid: &r.TSID,
mr: mrs[i],
})
}
if len(pendingNextDayMetricIDs) > 0 {
@ -1962,7 +1962,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
if a.date != b.date {
return a.date < b.date
}
return a.metricID < b.metricID
return a.tsid.MetricID < b.tsid.MetricID
})
idb := s.idb()
is := idb.getIndexSearch(noDeadline)
@ -1972,7 +1972,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
mn := GetMetricName()
for _, dmid := range pendingDateMetricIDs {
date := dmid.date
metricID := dmid.metricID
metricID := dmid.tsid.MetricID
ok, err := is.hasDateMetricID(date, metricID)
if err != nil {
if firstError == nil {
@ -1991,7 +1991,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
continue
}
mn.sortTags()
is.createPerDayIndexes(date, metricID, mn)
is.createPerDayIndexes(date, dmid.tsid, mn)
}
dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{
date: date,