lib/storage: move series registration in caches from createAllIndexesForMetricName into a separate function - putSeriesToCache

This makes the code more clear and easier to read

This is a follow-up for 7094fa38bc
This commit is contained in:
Aliaksandr Valialkin 2023-07-13 23:13:21 -07:00
parent 9baecdcd33
commit 6685f6ce7c
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 22 additions and 35 deletions

View file

@ -580,7 +580,6 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
date := uint64(timestampFromTime(time.Now())) / msecPerDay date := uint64(timestampFromTime(time.Now())) / msecPerDay
var metricNameBuf []byte var metricNameBuf []byte
var metricNameRawBuf []byte
for i := 0; i < 401; i++ { for i := 0; i < 401; i++ {
var mn MetricName var mn MetricName
@ -596,14 +595,12 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
} }
mn.sortTags() mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0]) metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
// Create tsid for the metricName. // Create tsid for the metricName.
var genTSID generationTSID var genTSID generationTSID
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
generateTSID(&genTSID.TSID, &mn) generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
} }
mns = append(mns, mn) mns = append(mns, mn)
@ -1531,7 +1528,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
now := uint64(timestampFromTime(theDay)) now := uint64(timestampFromTime(theDay))
baseDate := now / msecPerDay baseDate := now / msecPerDay
var metricNameBuf []byte var metricNameBuf []byte
var metricNameRawBuf []byte
perDayMetricIDs := make(map[uint64]*uint64set.Set) perDayMetricIDs := make(map[uint64]*uint64set.Set)
var allMetricIDs uint64set.Set var allMetricIDs uint64set.Set
labelNames := []string{ labelNames := []string{
@ -1566,12 +1562,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
mn.sortTags() mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0]) metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
var genTSID generationTSID var genTSID generationTSID
if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) {
generateTSID(&genTSID.TSID, &mn) generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date)
} }
metricIDs.Add(genTSID.TSID.MetricID) metricIDs.Add(genTSID.TSID.MetricID)
} }

View file

@ -76,7 +76,6 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) { func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) {
date := uint64(0) date := uint64(0)
var metricNameRaw []byte
is := db.getIndexSearch(noDeadline) is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
for i := 0; i < recordsPerLoop; i++ { for i := 0; i < recordsPerLoop; i++ {
@ -86,10 +85,8 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricNa
} }
mn.sortTags() mn.sortTags()
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
generateTSID(&genTSID.TSID, mn) generateTSID(&genTSID.TSID, mn)
genTSID.generation = db.generation createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
db.s.createAllIndexesForMetricName(is, mn, metricNameRaw, genTSID, date)
} }
} }
@ -104,7 +101,6 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
is := db.getIndexSearch(noDeadline) is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
var mn MetricName var mn MetricName
var metricNameRaw []byte
var genTSID generationTSID var genTSID generationTSID
date := uint64(0) date := uint64(0)
addSeries := func(kvs ...string) { addSeries := func(kvs ...string) {
@ -113,10 +109,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
mn.AddTag(kvs[i], kvs[i+1]) mn.AddTag(kvs[i], kvs[i+1])
} }
mn.sortTags() mn.sortTags()
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
generateTSID(&genTSID.TSID, &mn) generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
} }
for n := 0; n < 10; n++ { for n := 0; n < 10; n++ {
ns := strconv.Itoa(n) ns := strconv.Itoa(n)
@ -284,17 +278,14 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
mn.sortTags() mn.sortTags()
var genTSID generationTSID var genTSID generationTSID
var metricNameRaw []byte
date := uint64(12345) date := uint64(12345)
is := db.getIndexSearch(noDeadline) is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is) defer db.putIndexSearch(is)
for i := 0; i < recordsCount; i++ { for i := 0; i < recordsCount; i++ {
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
generateTSID(&genTSID.TSID, &mn) generateTSID(&genTSID.TSID, &mn)
genTSID.generation = db.generation createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date)
db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date)
} }
db.s.DebugFlush() db.s.DebugFlush()

View file

@ -1579,8 +1579,9 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
} }
mn.sortTags() mn.sortTags()
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++ seriesRepopulated++
} }
continue continue
@ -1611,13 +1612,11 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
if genTSID.generation != idb.generation { if genTSID.generation != idb.generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb. // The found TSID is from the previous indexdb. Create it in the current indexdb.
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++ seriesRepopulated++
} else {
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
} }
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
continue continue
} }
@ -1631,8 +1630,9 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
// Schedule creating TSID indexes instead of creating them synchronously. // Schedule creating TSID indexes instead of creating them synchronously.
// This should keep stable the ingestion rate when new time series are ingested. // This should keep stable the ingestion rate when new time series are ingested.
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
} }
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
@ -1740,8 +1740,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
} }
mn.sortTags() mn.sortTags()
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++ seriesRepopulated++
slowInsertsCount++ slowInsertsCount++
} }
@ -1776,13 +1777,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
if genTSID.generation != idb.generation { if genTSID.generation != idb.generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb. // The found TSID is from the previous indexdb. Create it in the current indexdb.
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++ seriesRepopulated++
} else {
// Store the found TSID in the cache, so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
} }
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
r.TSID = genTSID.TSID r.TSID = genTSID.TSID
prevTSID = genTSID.TSID prevTSID = genTSID.TSID
@ -1799,8 +1798,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue continue
} }
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation genTSID.generation = idb.generation
s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
newSeriesCount++ newSeriesCount++
r.TSID = genTSID.TSID r.TSID = genTSID.TSID
@ -1845,10 +1845,12 @@ func SetLogNewSeries(ok bool) {
var logNewSeries = false var logNewSeries = false
func (s *Storage) createAllIndexesForMetricName(is *indexSearch, mn *MetricName, metricNameRaw []byte, genTSID *generationTSID, date uint64) { func createAllIndexesForMetricName(is *indexSearch, mn *MetricName, tsid *TSID, date uint64) {
is.createGlobalIndexes(&genTSID.TSID, mn) is.createGlobalIndexes(tsid, mn)
is.createPerDayIndexes(date, &genTSID.TSID, mn) is.createPerDayIndexes(date, tsid, mn)
}
func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID, date uint64) {
// Store the TSID for for the current indexdb into cache, // Store the TSID for for the current indexdb into cache,
// so future rows for that TSID are ingested via fast path. // so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(genTSID, metricNameRaw) s.putTSIDToCache(genTSID, metricNameRaw)