From a13d21513edcb8f1526be456a5401b1957292e18 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 3 Dec 2022 23:30:31 -0800 Subject: [PATCH] lib/mergeset: panic when too long item is passed to Table.AddItems() --- lib/mergeset/table.go | 38 ++++++----- lib/mergeset/table_search_test.go | 4 +- lib/mergeset/table_test.go | 14 +--- lib/storage/index_db.go | 43 ++++--------- lib/storage/index_db_test.go | 103 +----------------------------- lib/storage/storage.go | 7 +- 6 files changed, 38 insertions(+), 171 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index f0203eabd..5dbbb4ad9 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -138,12 +138,11 @@ func (riss *rawItemsShards) init() { riss.shards = make([]rawItemsShard, rawItemsShardsPerTable) } -func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error { +func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) { n := atomic.AddUint32(&riss.shardIdx, 1) shards := riss.shards idx := n % uint32(len(shards)) - shard := &shards[idx] - return shard.addItems(tb, items) + shards[idx].addItems(tb, items) } func (riss *rawItemsShards) Len() int { @@ -180,8 +179,7 @@ func (ris *rawItemsShard) Len() int { return n } -func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error { - var err error +func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) { var blocksToFlush []*inmemoryBlock ris.mu.Lock() @@ -193,17 +191,18 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error { } ib := ibs[len(ibs)-1] for _, item := range items { - if !ib.Add(item) { - ib = getInmemoryBlock() - if !ib.Add(item) { - putInmemoryBlock(ib) - err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item)) - break - } - ibs = append(ibs, ib) - ris.ibs = ibs + if ib.Add(item) { + continue } + ib = getInmemoryBlock() + if ib.Add(item) { + ibs = append(ibs, ib) + continue + } + putInmemoryBlock(ib) + logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items", len(item)) } + ris.ibs = ibs if len(ibs) >= maxBlocksPerShard { blocksToFlush = append(blocksToFlush, ibs...) for i := range ibs { @@ -215,7 +214,6 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error { ris.mu.Unlock() tb.mergeRawItemsBlocks(blocksToFlush, false) - return err } type partWrapper struct { @@ -457,17 +455,17 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { } // AddItems adds the given items to the tb. -func (tb *Table) AddItems(items [][]byte) error { - if err := tb.rawItems.addItems(tb, items); err != nil { - return fmt.Errorf("cannot insert data into %q: %w", tb.path, err) - } +// +// The function panics when items contains an item with length exceeding maxInmemoryBlockSize. +// It is caller's responsibility to make sure there are no too long items. +func (tb *Table) AddItems(items [][]byte) { + tb.rawItems.addItems(tb, items) atomic.AddUint64(&tb.itemsAdded, uint64(len(items))) n := 0 for _, item := range items { n += len(item) } atomic.AddUint64(&tb.itemsAddedSizeBytes, uint64(n)) - return nil } // getParts appends parts snapshot to dst and returns it. diff --git a/lib/mergeset/table_search_test.go b/lib/mergeset/table_search_test.go index 249aa3109..f0ec1f888 100644 --- a/lib/mergeset/table_search_test.go +++ b/lib/mergeset/table_search_test.go @@ -161,9 +161,7 @@ func newTestTable(path string, itemsCount int) (*Table, []string, error) { items := make([]string, itemsCount) for i := 0; i < itemsCount; i++ { item := fmt.Sprintf("%d:%d", rand.Intn(1e9), i) - if err := tb.AddItems([][]byte{[]byte(item)}); err != nil { - return nil, nil, fmt.Errorf("cannot add item: %w", err) - } + tb.AddItems([][]byte{[]byte(item)}) items[i] = item } tb.DebugFlush() diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index 0756a13c7..6a7968537 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -7,8 +7,6 @@ import ( "sync" "sync/atomic" "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) func TestTableOpenClose(t *testing.T) { @@ -120,9 +118,7 @@ func testAddItemsSerial(tb *Table, itemsCount int) { if len(item) > maxInmemoryBlockSize { item = item[:maxInmemoryBlockSize] } - if err := tb.AddItems([][]byte{item}); err != nil { - logger.Panicf("BUG: cannot add item to table: %s", err) - } + tb.AddItems([][]byte{item}) } } @@ -146,9 +142,7 @@ func TestTableCreateSnapshotAt(t *testing.T) { const itemsCount = 3e5 for i := 0; i < itemsCount; i++ { item := []byte(fmt.Sprintf("item %d", i)) - if err := tb.AddItems([][]byte{item}); err != nil { - t.Fatalf("cannot add item to table: %s", err) - } + tb.AddItems([][]byte{item}) } tb.DebugFlush() @@ -276,9 +270,7 @@ func testAddItemsConcurrent(tb *Table, itemsCount int) { if len(item) > maxInmemoryBlockSize { item = item[:maxInmemoryBlockSize] } - if err := tb.AddItems([][]byte{item}); err != nil { - logger.Panicf("BUG: cannot add item to table: %s", err) - } + tb.AddItems([][]byte{item}) } }() } diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 8586f0c81..e61abe756 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -408,12 +408,8 @@ func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err) } mn.sortTags() - if err := is.createGlobalIndexes(tsid, mn); err != nil { - return false, fmt.Errorf("cannot create global indexes: %w", err) - } - if err := is.createPerDayIndexes(date, tsid.MetricID, mn); err != nil { - return false, fmt.Errorf("cannot create per-day indexes for date=%s: %w", dateToString(date), err) - } + is.createGlobalIndexes(tsid, mn) + is.createPerDayIndexes(date, tsid.MetricID, mn) PutMetricName(mn) atomic.AddUint64(&is.db.timeseriesRepopulated, 1) return true, nil @@ -619,12 +615,8 @@ func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []b if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil { return err } - if err := is.createGlobalIndexes(dst, mn); err != nil { - return fmt.Errorf("cannot create global indexes: %w", err) - } - if err := is.createPerDayIndexes(date, dst.MetricID, mn); err != nil { - return fmt.Errorf("cannot create per-day indexes for date=%s: %w", dateToString(date), err) - } + is.createGlobalIndexes(dst, mn) + is.createPerDayIndexes(date, dst.MetricID, mn) // There is no need in invalidating tag cache, since it is invalidated // on db.tb flush via invalidateTagFiltersCache flushCallback passed to OpenTable. @@ -690,7 +682,7 @@ func generateTSID(dst *TSID, mn *MetricName) { dst.MetricID = generateUniqueMetricID() } -func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) error { +func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) { // The order of index items is important. // It guarantees index consistency. @@ -721,7 +713,7 @@ func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) error { ii.registerTagIndexes(prefix.B, mn, tsid.MetricID) kbPool.Put(prefix) - return is.db.tb.AddItems(ii.Items) + is.db.tb.AddItems(ii.Items) } type indexItems struct { @@ -1792,9 +1784,7 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64, accoun // Mark the metricID as deleted, so it will be created again when new data point // for the given time series will arrive. - if err := db.deleteMetricIDs([]uint64{metricID}); err != nil { - return dst, fmt.Errorf("cannot delete metricID for missing metricID->metricName entry; metricID=%d; error: %w", metricID, err) - } + db.deleteMetricIDs([]uint64{metricID}) return dst, io.EOF } @@ -1821,9 +1811,7 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, if err != nil { return 0, err } - if err := db.deleteMetricIDs(metricIDs); err != nil { - return 0, err - } + db.deleteMetricIDs(metricIDs) // Delete TSIDs in the extDB. deletedCount := len(metricIDs) @@ -1841,10 +1829,10 @@ func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, return deletedCount, nil } -func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { +func (db *indexDB) deleteMetricIDs(metricIDs []uint64) { if len(metricIDs) == 0 { // Nothing to delete - return nil + return } // atomically add deleted metricIDs to an inmemory map. @@ -1869,9 +1857,8 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { items.B = encoding.MarshalUint64(items.B, metricID) items.Next() } - err := db.tb.AddItems(items.Items) + db.tb.AddItems(items.Items) putIndexItems(items) - return err } func (db *indexDB) loadDeletedMetricIDs() (*uint64set.Set, error) { @@ -2947,7 +2934,7 @@ const ( int64Max = int64((1 << 63) - 1) ) -func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) error { +func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) { ii := getIndexItems() defer putIndexItems(ii) @@ -2962,12 +2949,8 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs, mn.AccountID, mn.ProjectID) kb.B = encoding.MarshalUint64(kb.B, date) ii.registerTagIndexes(kb.B, mn, metricID) - - if err := is.db.tb.AddItems(ii.Items); err != nil { - return fmt.Errorf("cannot add per-day entires for metricID %d: %w", metricID, err) - } + is.db.tb.AddItems(ii.Items) is.db.s.dateMetricIDCache.Set(date, metricID) - return nil } func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 2746d3c56..46f10f6f1 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -541,22 +541,13 @@ func TestIndexDB(t *testing.T) { } }() - if err := testIndexDBBigMetricName(db); err != nil { - t.Fatalf("unexpected error: %s", err) - } mns, tsids, tenants, err := testIndexDBGetOrCreateTSIDByName(db, accountsCount, projectsCount, metricGroups) if err != nil { t.Fatalf("unexpected error: %s", err) } - if err := testIndexDBBigMetricName(db); err != nil { - t.Fatalf("unexpected error: %s", err) - } if err := testIndexDBCheckTSIDByName(db, mns, tsids, tenants, false); err != nil { t.Fatalf("unexpected error: %s", err) } - if err := testIndexDBBigMetricName(db); err != nil { - t.Fatalf("unexpected error: %s", err) - } // Re-open the db and verify it works as expected. db.MustClose() @@ -564,15 +555,9 @@ func TestIndexDB(t *testing.T) { if err != nil { t.Fatalf("cannot open indexDB: %s", err) } - if err := testIndexDBBigMetricName(db); err != nil { - t.Fatalf("unexpected error: %s", err) - } if err := testIndexDBCheckTSIDByName(db, mns, tsids, tenants, false); err != nil { t.Fatalf("unexpected error: %s", err) } - if err := testIndexDBBigMetricName(db); err != nil { - t.Fatalf("unexpected error: %s", err) - } }) t.Run("concurrent", func(t *testing.T) { @@ -595,27 +580,15 @@ func TestIndexDB(t *testing.T) { ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func() { - if err := testIndexDBBigMetricName(db); err != nil { - ch <- err - return - } mns, tsid, tenants, err := testIndexDBGetOrCreateTSIDByName(db, accountsCount, projectsCount, metricGroups) if err != nil { ch <- err return } - if err := testIndexDBBigMetricName(db); err != nil { - ch <- err - return - } if err := testIndexDBCheckTSIDByName(db, mns, tsid, tenants, true); err != nil { ch <- err return } - if err := testIndexDBBigMetricName(db); err != nil { - ch <- err - return - } ch <- nil }() } @@ -636,74 +609,6 @@ func TestIndexDB(t *testing.T) { }) } -func testIndexDBBigMetricName(db *indexDB) error { - var bigBytes []byte - for i := 0; i < 128*1000; i++ { - bigBytes = append(bigBytes, byte(i)) - } - var mn MetricName - var tsid TSID - - is := db.getIndexSearch(0, 0, noDeadline) - defer db.putIndexSearch(is) - - // Try creating too big metric group - mn.Reset() - mn.MetricGroup = append(mn.MetricGroup[:0], bigBytes...) - mn.sortTags() - metricName := mn.Marshal(nil) - metricNameRaw := mn.marshalRaw(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { - return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big MetricGroup") - } - - // Try creating too big tag key - mn.Reset() - mn.MetricGroup = append(mn.MetricGroup[:0], "xxx"...) - mn.Tags = []Tag{{ - Key: append([]byte(nil), bigBytes...), - Value: []byte("foobar"), - }} - mn.sortTags() - metricName = mn.Marshal(nil) - metricNameRaw = mn.marshalRaw(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { - return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag key") - } - - // Try creating too big tag value - mn.Reset() - mn.MetricGroup = append(mn.MetricGroup[:0], "xxx"...) - mn.Tags = []Tag{{ - Key: []byte("foobar"), - Value: append([]byte(nil), bigBytes...), - }} - mn.sortTags() - metricName = mn.Marshal(nil) - metricNameRaw = mn.marshalRaw(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { - return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too big tag value") - } - - // Try creating metric name with too many tags - mn.Reset() - mn.MetricGroup = append(mn.MetricGroup[:0], "xxx"...) - for i := 0; i < 60000; i++ { - mn.Tags = append(mn.Tags, Tag{ - Key: []byte(fmt.Sprintf("foobar %d", i)), - Value: []byte(fmt.Sprintf("sdfjdslkfj %d", i)), - }) - } - mn.sortTags() - metricName = mn.Marshal(nil) - metricNameRaw = mn.marshalRaw(nil) - if err := is.GetOrCreateTSIDByName(&tsid, metricName, metricNameRaw, 0); err == nil { - return fmt.Errorf("expecting non-nil error on an attempt to insert metric with too many tags") - } - - return nil -} - func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, metricGroups int) ([]MetricName, []TSID, []string, error) { // Create tsids. var mns []MetricName @@ -756,9 +661,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, date := uint64(timestampFromTime(time.Now())) / msecPerDay for i := range tsids { tsid := &tsids[i] - if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil { - return nil, nil, nil, fmt.Errorf("error in createPerDayIndexes(%d, %d): %w", date, tsid.MetricID, err) - } + is.createPerDayIndexes(date, tsid.MetricID, &mns[i]) } // Flush index to disk, so it becomes visible for search @@ -1838,9 +1741,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { for i := range tsids { tsid := &tsids[i] metricIDs.Add(tsid.MetricID) - if err := is.createPerDayIndexes(date, tsid.MetricID, &mns[i]); err != nil { - t.Fatalf("error in createPerDayIndexes(%d, %d): %s", date, tsid.MetricID, err) - } + is.createPerDayIndexes(date, tsid.MetricID, &mns[i]) } allMetricIDs.Union(&metricIDs) perDayMetricIDs[date] = &metricIDs diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3db94dd48..a319efb87 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2196,12 +2196,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { continue } mn.sortTags() - if err := is.createPerDayIndexes(date, metricID, mn); err != nil { - if firstError == nil { - firstError = fmt.Errorf("error when storing per-date inverted index for (date=%s, metricID=%d): %w", dateToString(date), metricID, err) - } - continue - } + is.createPerDayIndexes(date, metricID, mn) } dateMetricIDsForCache = append(dateMetricIDsForCache, dateMetricID{ date: date,