lib/storage: properly take into account already registered series when -storage.maxHourlySeries or -storage.maxDailySeries limits are enabled

The commit 5fb45173ae takes into account only newly registered series
when applying cardinality limits. This means that the cardinality limit could be exceeded with already registered series.
This commit returns back accounting for already registered series when applying cardinality limits.
This commit is contained in:
Aliaksandr Valialkin 2022-06-20 13:47:43 +03:00
parent 4b4f03fa1f
commit b958fc7846
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 52 additions and 28 deletions

View file

@ -521,7 +521,7 @@ type indexSearch struct {
//
// It also registers the metricName in global and per-day indexes
// for the given date if the metricName->TSID entry is missing in the index.
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date uint64) error {
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
// A hack: skip searching for the TSID after many serial misses.
// This should improve insertion performance for big batches
// of new time series.
@ -529,7 +529,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date
err := is.getTSIDByMetricName(dst, metricName)
if err == nil {
is.tsidByNameMisses = 0
return nil
return is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw)
}
if err != io.EOF {
return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)
@ -546,7 +546,7 @@ func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte, date
// TSID for the given name wasn't found. Create it.
// It is OK if duplicate TSID for mn is created by concurrent goroutines.
// Metric results will be merged by mn after TableSearch.
if err := is.createTSIDByName(dst, metricName, date); err != nil {
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err)
}
return nil
@ -577,7 +577,7 @@ func (db *indexDB) putIndexSearch(is *indexSearch) {
db.indexSearchPool.Put(is)
}
func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint64) error {
func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
mn := GetMetricName()
defer PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil {
@ -588,8 +588,8 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint6
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
if !is.db.s.registerSeriesCardinality(dst.MetricID, mn) {
return errSeriesCardinalityExceeded
if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
if err := is.createGlobalIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create global indexes: %w", err)
@ -611,8 +611,6 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName []byte, date uint6
return nil
}
var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded")
// SetLogNewSeries updates new series logging.
//
// This function must be called before any calling any storage functions.

View file

@ -604,7 +604,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...)
mn.sortTags()
metricName := mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw := mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup")
}
@ -617,7 +618,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
}}
mn.sortTags()
metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key")
}
@ -630,7 +632,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
}}
mn.sortTags()
metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value")
}
@ -645,7 +648,8 @@ func testIndexDBBigMetricName(db *indexDB) error {
}
mn.sortTags()
metricName = mn.Marshal(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err == nil {
metricNameRaw = mn.marshalRaw(nil)
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil {
return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags")
}
@ -661,6 +665,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
defer db.putIndexSearch(is)
var metricNameBuf []byte
var metricNameRawBuf []byte
for i := 0; i < 4e2+1; i++ {
var mn MetricName
@ -676,10 +681,11 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, metricGroups int) ([]MetricNa
}
mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
// Create tsid for the metricName.
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil {
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %w", &mn, err)
}
@ -1631,6 +1637,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
now := uint64(timestampFromTime(theDay))
baseDate := now / msecPerDay
var metricNameBuf []byte
var metricNameRawBuf []byte
perDayMetricIDs := make(map[uint64]*uint64set.Set)
var allMetricIDs uint64set.Set
labelNames := []string{
@ -1661,8 +1668,9 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
mn.sortTags()
metricNameBuf = mn.Marshal(metricNameBuf[:0])
metricNameRawBuf = mn.marshalRaw(metricNameRawBuf[:0])
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, 0); err != nil {
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf, metricNameRawBuf, 0); err != nil {
t.Fatalf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
}
mns = append(mns, mn)

View file

@ -84,6 +84,7 @@ func BenchmarkIndexDBAddTSIDs(b *testing.B) {
func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffset, recordsPerLoop int) {
var metricName []byte
var metricNameRaw []byte
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsPerLoop; i++ {
@ -93,7 +94,8 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(tsid, metricName, 0); err != nil {
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.GetOrCreateTSIDByName(tsid, metricName, metricNameRaw, 0); err != nil {
panic(fmt.Errorf("cannot insert record: %w", err))
}
}
@ -121,6 +123,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
// Fill the db with data as in https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L66
var mn MetricName
var metricName []byte
var metricNameRaw []byte
var tsid TSID
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
@ -131,7 +134,8 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
}
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.createTSIDByName(&tsid, metricName, 0); err != nil {
metricNameRaw = mn.marshalRaw(metricNameRaw[:0])
if err := is.createTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
@ -309,13 +313,15 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
}
var tsid TSID
var metricName []byte
var metricNameRaw []byte
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
for i := 0; i < recordsCount; i++ {
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName, 0); err != nil {
metricNameRaw = mn.marshalRaw(metricName[:0])
if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err != nil {
b.Fatalf("cannot insert record: %s", err)
}
}
@ -326,6 +332,7 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
var tsidLocal TSID
var metricNameLocal []byte
var metricNameLocalRaw []byte
mnLocal := mn
is := db.getIndexSearch(noDeadline)
defer db.putIndexSearch(is)
@ -333,7 +340,8 @@ func BenchmarkIndexDBGetTSIDs(b *testing.B) {
for i := 0; i < recordsPerLoop; i++ {
mnLocal.sortTags()
metricNameLocal = mnLocal.Marshal(metricNameLocal[:0])
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, 0); err != nil {
metricNameLocalRaw = mnLocal.marshalRaw(metricNameLocalRaw[:0])
if err := is.GetOrCreateTSIDByName(&tsidLocal, metricNameLocal, metricNameLocalRaw, 0); err != nil {
panic(fmt.Errorf("cannot obtain tsid: %w", err))
}
}

View file

@ -1677,6 +1677,9 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
for i := range mrs {
mr := &mrs[i]
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if err := s.registerSeriesCardinality(genTSID.TSID.MetricID, mr.MetricNameRaw); err != nil {
continue
}
if genTSID.generation == idb.generation {
// Fast path - mr.MetricNameRaw has been already registered in the current idb.
continue
@ -1689,7 +1692,7 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
date := uint64(mr.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, date); err != nil {
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName, mr.MetricNameRaw, date); err != nil {
if errors.Is(err, errSeriesCardinalityExceeded) {
continue
}
@ -1762,6 +1765,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
if err := s.registerSeriesCardinality(r.TSID.MetricID, mr.MetricNameRaw); err != nil {
j--
continue
}
r.TSID = genTSID.TSID
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
@ -1829,7 +1836,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
slowInsertsCount++
date := uint64(r.Timestamp) / msecPerDay
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, date); err != nil {
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
j--
if errors.Is(err, errSeriesCardinalityExceeded) {
continue
@ -1874,26 +1881,29 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
return nil
}
func (s *Storage) registerSeriesCardinality(metricID uint64, mn *MetricName) bool {
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) error {
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)
logSkippedSeries(mn, "-storage.maxHourlySeries", sl.MaxItems())
return false
logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems())
return errSeriesCardinalityExceeded
}
if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1)
logSkippedSeries(mn, "-storage.maxDailySeries", sl.MaxItems())
return false
logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems())
return errSeriesCardinalityExceeded
}
return true
return nil
}
func logSkippedSeries(mn *MetricName, flagName string, flagValue int) {
var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded")
func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) {
select {
case <-logSkippedSeriesTicker.C:
// Do not use logger.WithThrottler() here, since this will result in increased CPU load
// because of getUserReadableMetricName() calls per each logSkippedSeries call.
logger.Warnf("skip series %s because %s=%d reached", mn, flagName, flagValue)
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
logger.Warnf("skip series %s because %s=%d reached", userReadableMetricName, flagName, flagValue)
default:
}
}