From 270ad39359f834c0262000c0b32a221ee93d8136 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@victoriametrics.com> Date: Mon, 20 Jun 2022 13:47:43 +0300 Subject: [PATCH] lib/storage: properly take into account already registered series when `-storage.maxHourlySeries` or `-storage.maxDailySeries` limits are enabled The commit 5fb45173ae2c3dae51264c0ddd7e0b917ecc519e takes into account only newly registered series when applying cardinality limits. This means that the cardinality limit could be exceeded with already registered series. This commit returns back accounting for already registered series when applying cardinality limits. --- lib/storage/index_db.go | 14 ++++++-------- lib/storage/index_db_test.go | 20 +++++++++++++------ lib/storage/index_db_timing_test.go | 16 +++++++++++---- lib/storage/storage.go | 30 +++++++++++++++++++---------- 4 files changed, 52 insertions(+), 28 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 5b0b98d3f..e420e15ae 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -537,7 +537,7 @@ type indexSearch struct { // // It also registers the metricName in global and per-day indexes // for the given date if the metricName->TSID entry is missing in the index. -func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date uint64) error { +func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error { // A hack: skip searching for the TSID after many serial misses. // This should improve insertion performance for big batches // of new time series. @@ -545,7 +545,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date err := is.getTSIDByMetricName(dst, metricName) if err == nil { is.tsidByNameMisses = 0 - return nil + return is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw) } if err != io.EOF { return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err) @@ -562,7 +562,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date // TSID for the given name wasn't found. Create it. // It is OK if duplicate TSID for mn is created by concurrent goroutines. // Metric results will be merged by mn after TableSearch. - if err := is.createTSIDByName(dst, metricName, date); err != nil { + if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil { return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err) } return nil @@ -597,7 +597,7 @@ func (db *indexDB) putIndexSearch(is *indexSearch) { db.indexSearchPool.Put(is) } -func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint64) error { +func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error { mn := GetMetricName() defer PutMetricName(mn) if err := mn.Unmarshal(metricName); err != nil { @@ -608,8 +608,8 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint6 if err != nil { return fmt.Errorf("cannot generate TSID: %w", err) } - if !is.db.s.registerSeriesCardinality(dst.MetricID, mn) { - return errSeriesCardinalityExceeded + if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil { + return err } if err := is.createGlobalIndexes(dst, mn); err != nil { return fmt.Errorf("cannot create global indexes: %w", err) @@ -631,8 +631,6 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint6 return nil } -var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded") - // SetLogNewSeries updates new series logging. // // This function must be called before any calling any storage functions. diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 74bc69c3f..1fe8b8f80 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -622,7 +622,8 @@ func testIndexDBBigMetricName(db *indexDB) error { mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...) mn.sortTags() metricName := mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { + metricNameRaw := mn.marshalRaw(nil) + if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup") } @@ -635,7 +636,8 @@ func testIndexDBBigMetricName(db *indexDB) error { }} mn.sortTags() metricName = mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { + metricNameRaw = mn.marshalRaw(nil) + if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key") } @@ -648,7 +650,8 @@ func testIndexDBBigMetricName(db *indexDB) error { }} mn.sortTags() metricName = mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { + metricNameRaw = mn.marshalRaw(nil) + if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value") } @@ -663,7 +666,8 @@ func testIndexDBBigMetricName(db *indexDB) error { } mn.sortTags() metricName = mn.Marshal(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil { + metricNameRaw = mn.marshalRaw(nil) + if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags") } @@ -679,6 +683,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, defer db.putIndexSearch(is) var metricNameBuf []byte + var metricNameRawBuf []byte for i := 0; i < 4e2+1; i++ { var mn MetricName mn.AccountID = uint32((i + 2) % accountsCount) @@ -696,10 +701,11 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, } mn.sortTags() metricNameBuf = mn.Marshal(metricNameBuf[:0]) + metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0]) // Create tsid for the metricName. var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil { return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err) } if tsid.AccountID != mn.AccountID { @@ -1701,6 +1707,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { now := uint64(timestampFromTime(theDay)) baseDate := now / msecPerDay var metricNameBuf []byte + var metricNameRawBuf []byte perDayMetricIDs := make(map[uint64]*uint64set.Set) var allMetricIDs uint64set.Set labelNames := []string{ @@ -1733,8 +1740,9 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { mn.sortTags() metricNameBuf = mn.Marshal(metricNameBuf[:0]) + metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0]) var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil { t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err) } if tsid.AccountID != accountID { diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 13118bd9f..1bec53d6b 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -88,6 +88,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) { func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) { var metricName []byte + var metricNameRaw []byte is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) for i := 0; i < recordsPerLoop; i++ { @@ -97,7 +98,8 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs } mn.sortTags() metricName = mn.Marshal(metricName[:0]) - if err := is.GetOrCreateTSIDByName(tsid, metricName, 0); err != nil { + metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) + if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil { panic(fmt.Errorf("cannot insert record: %w", err)) } } @@ -127,6 +129,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { const projectID = 893433 var mn MetricName var metricName []byte + var metricNameRaw []byte var tsid TSID is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) @@ -139,7 +142,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { mn.AccountID = accountID mn.ProjectID = projectID metricName = mn.Marshal(metricName[:0]) - if err := is.createTSIDByName(&tsid, metricName, 0); err != nil { + metricNameRaw = mn.marshalRaw(metricNameRaw[:0]) + if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil { b.Fatalf("cannot insert record: %s", err) } } @@ -319,6 +323,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { } var tsid TSID var metricName []byte + var metricNameRaw []byte is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) @@ -327,7 +332,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { mn.ProjectID = uint32(i % projectsCount) mn.sortTags() metricName = mn.Marshal(metricName[:0]) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err != nil { + metricNameRaw = mn.marshalRaw(metricName[:0]) + if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil { b.Fatalf("cannot insert record: %s", err) } } @@ -338,6 +344,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { b.RunParallel(func(pb *testing.PB) { var tsidLocal TSID var metricNameLocal []byte + var metricNameLocalRaw []byte mnLocal := mn is := db.getIndexSearch(0, 0, noDeadline) defer db.putIndexSearch(is) @@ -347,7 +354,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) { mnLocal.ProjectID = uint32(i % projectsCount) mnLocal.sortTags() metricNameLocal = mnLocal.Marshal(metricNameLocal[:0]) - if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, 0); err != nil { + metricNameLocalRaw = mnLocal.marshalRaw(metricNameLocalRaw[:0]) + if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, metricNameLocalRaw, 0); err != nil { panic(fmt.Errorf("cannot obtain tsid: %w", err)) } } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 51681266a..8dc866d52 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1786,6 +1786,9 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { for i := range mrs { mr := &mrs[i] if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + if err := s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw); err != nil { + continue + } if genTSID.generation == idb.generation { // Fast path - mr.MetricNameRaw has been already registered in the current idb. continue @@ -1798,7 +1801,7 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { mn.sortTags() metricName = mn.Marshal(metricName[:0]) date := uint64(mr.Timestamp) / msecPerDay - if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, date); err != nil { + if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil { if errors.Is(err, errSeriesCardinalityExceeded) { continue } @@ -1871,6 +1874,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci continue } if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { + if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil { + j-- + continue + } r.TSID = genTSID.TSID // Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted. // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't @@ -1938,7 +1945,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci } slowInsertsCount++ date := uint64(r.Timestamp) / msecPerDay - if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, date); err != nil { + if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil { j-- if errors.Is(err, errSeriesCardinalityExceeded) { continue @@ -1983,26 +1990,29 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci return nil } -func (s *Storage) registerSeriesCardinality(metricID uint64, mn *MetricName) bool { +func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error { if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) { atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1) - logSkippedSeries(mn, "-storage.maxHourlySeries", sl.MaxItems()) - return false + logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems()) + return errSeriesCardinalityExceeded } if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) { atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1) - logSkippedSeries(mn, "-storage.maxDailySeries", sl.MaxItems()) - return false + logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems()) + return errSeriesCardinalityExceeded } - return true + return nil } -func logSkippedSeries(mn *MetricName, flagName string, flagValue int) { +var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded") + +func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) { select { case <-logSkippedSeriesTicker.C: // Do not use logger.WithThrottler() here, since this will result in increased CPU load // because of getUserReadableMetricName() calls per each logSkippedSeries call. - logger.Warnf("skip series %s because %s=%d reached", mn, flagName, flagValue) + userReadableMetricName := getUserReadableMetricName(metricNameRaw) + logger.Warnf("skip series %s because %s=%d reached", userReadableMetricName, flagName, flagValue) default: } }