From 6685f6ce7ce8909336da722edf3d0b0a48ff749c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Jul 2023 23:13:21 -0700 Subject: [PATCH] lib/storage: move series registration in caches from createAllIndexesForMetricName into a separate function - putSeriesToCache This makes the code more clear and easier to read This is a follow-up for 7094fa38bc207c7bd7330ea8a834310a310ce5e3 --- lib/storage/index_db_test.go | 10 ++------- lib/storage/index_db_timing_test.go | 15 +++----------- lib/storage/storage.go | 32 +++++++++++++++-------------- 3 files changed, 22 insertions(+), 35 deletions(-) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index e9cee2056..58185d7ca 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -580,7 +580,6 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa date := uint64(timestampFromTime(time.Now())) / msecPerDay var metricNameBuf []byte - var metricNameRawBuf []byte for i := 0; i < 401; i++ { var mn MetricName @@ -596,14 +595,12 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa } mn.sortTags() metricNameBuf = mn.Marshal(metricNameBuf[:0]) - metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0]) // Create tsid for the metricName. var genTSID generationTSID if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { generateTSID(&genTSID.TSID, &mn) - genTSID.generation = db.generation - db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date) + createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date) } mns = append(mns, mn) @@ -1531,7 +1528,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { now := uint64(timestampFromTime(theDay)) baseDate := now / msecPerDay var metricNameBuf []byte - var metricNameRawBuf []byte perDayMetricIDs := make(map[uint64]*uint64set.Set) var allMetricIDs uint64set.Set labelNames := []string{ @@ -1566,12 +1562,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { mn.sortTags() metricNameBuf = mn.Marshal(metricNameBuf[:0]) - metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0]) var genTSID generationTSID if !is.getTSIDByMetricName(&genTSID, metricNameBuf, date) { generateTSID(&genTSID.TSID, &mn) - genTSID.generation = db.generation - db.s.createAllIndexesForMetricName(is, &mn, metricNameRawBuf, &genTSID, date) + createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date) } metricIDs.Add(genTSID.TSID.MetricID) } diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index dc40c1571..0a2c48c19 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -76,7 +76,6 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricName, startOffset, recordsPerLoop int) { date := uint64(0) - var metricNameRaw []byte is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) for i := 0; i < recordsPerLoop; i++ { @@ -86,10 +85,8 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, genTSID *generationTSID, mn *MetricNa } mn.sortTags() - metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) generateTSID(&genTSID.TSID, mn) - genTSID.generation = db.generation - db.s.createAllIndexesForMetricName(is, mn, metricNameRaw, genTSID, date) + createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) } } @@ -104,7 +101,6 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) var mn MetricName - var metricNameRaw []byte var genTSID generationTSID date := uint64(0) addSeries := func(kvs ...string) { @@ -113,10 +109,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { mn.AddTag(kvs[i], kvs[i+1]) } mn.sortTags() - metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) generateTSID(&genTSID.TSID, &mn) - genTSID.generation = db.generation - db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date) + createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date) } for n := 0; n < 10; n++ { ns := strconv.Itoa(n) @@ -284,17 +278,14 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { mn.sortTags() var genTSID generationTSID - var metricNameRaw []byte date := uint64(12345) is := db.getIndexSearch(noDeadline) defer db.putIndexSearch(is) for i := 0; i < recordsCount; i++ { - metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) generateTSID(&genTSID.TSID, &mn) - genTSID.generation = db.generation - db.s.createAllIndexesForMetricName(is, &mn, metricNameRaw, &genTSID, date) + createAllIndexesForMetricName(is, &mn, &genTSID.TSID, date) } db.s.DebugFlush() diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 818de037d..5188b2def 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1579,8 +1579,9 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { } mn.sortTags() + createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) genTSID.generation = idb.generation - s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) seriesRepopulated++ } continue @@ -1611,13 +1612,11 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { if genTSID.generation != idb.generation { // The found TSID is from the previous indexdb. Create it in the current indexdb. + createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) genTSID.generation = idb.generation - s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) seriesRepopulated++ - } else { - // Store the found TSID in the cache, so future rows for that TSID are ingested via fast path. - s.putTSIDToCache(&genTSID, mr.MetricNameRaw) } + s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) continue } @@ -1631,8 +1630,9 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { // Schedule creating TSID indexes instead of creating them synchronously. // This should keep stable the ingestion rate when new time series are ingested. + createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) genTSID.generation = idb.generation - s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) } atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) @@ -1740,8 +1740,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci } mn.sortTags() + createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) genTSID.generation = idb.generation - s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) seriesRepopulated++ slowInsertsCount++ } @@ -1776,13 +1777,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci if genTSID.generation != idb.generation { // The found TSID is from the previous indexdb. Create it in the current indexdb. + createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) genTSID.generation = idb.generation - s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) seriesRepopulated++ - } else { - // Store the found TSID in the cache, so future rows for that TSID are ingested via fast path. - s.putTSIDToCache(&genTSID, mr.MetricNameRaw) } + s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) r.TSID = genTSID.TSID prevTSID = genTSID.TSID @@ -1799,8 +1798,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci continue } + createAllIndexesForMetricName(is, mn, &genTSID.TSID, date) genTSID.generation = idb.generation - s.createAllIndexesForMetricName(is, mn, mr.MetricNameRaw, &genTSID, date) + s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) newSeriesCount++ r.TSID = genTSID.TSID @@ -1845,10 +1845,12 @@ func SetLogNewSeries(ok bool) { var logNewSeries = false -func (s *Storage) createAllIndexesForMetricName(is *indexSearch, mn *MetricName, metricNameRaw []byte, genTSID *generationTSID, date uint64) { - is.createGlobalIndexes(&genTSID.TSID, mn) - is.createPerDayIndexes(date, &genTSID.TSID, mn) +func createAllIndexesForMetricName(is *indexSearch, mn *MetricName, tsid *TSID, date uint64) { + is.createGlobalIndexes(tsid, mn) + is.createPerDayIndexes(date, tsid, mn) +} +func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID, date uint64) { // Store the TSID for for the current indexdb into cache, // so future rows for that TSID are ingested via fast path. s.putTSIDToCache(genTSID, metricNameRaw)