From 4e26ad869b13642a22e6238aa0183740d9683c74 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 23 Sep 2019 20:40:38 +0300 Subject: [PATCH] lib/{storage,mergeset}: verify PrepareBlock callback results Do not touch the first and the last item passed to PrepareBlock in order to preserve sort order of mergeset blocks. --- lib/mergeset/encoding.go | 5 +++ lib/mergeset/merge.go | 44 +++++++++++++++++++---- lib/storage/index_db.go | 67 +++++++++++++++++------------------- lib/storage/index_db_test.go | 29 ++++++++-------- 4 files changed, 89 insertions(+), 56 deletions(-) diff --git a/lib/mergeset/encoding.go b/lib/mergeset/encoding.go index b048d2970..901eebbe1 100644 --- a/lib/mergeset/encoding.go +++ b/lib/mergeset/encoding.go @@ -168,8 +168,13 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo func (ib *inmemoryBlock) debugItemsString() string { var sb strings.Builder + var prevItem []byte for i, item := range ib.items { + if string(item) < string(prevItem) { + fmt.Fprintf(&sb, "!!! the next item is smaller than the previous item !!!\n") + } fmt.Fprintf(&sb, "%05d %X\n", i, item) + prevItem = item } return sb.String() } diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index 398fffa9d..103612b25 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // PrepareBlockCallback can transform the passed items allocated at the given data. @@ -12,7 +14,7 @@ import ( // The callback is called during merge before flushing full block of the given items // to persistent storage. // -// The callback must return sorted items. +// The callback must return sorted items. The first and the last item must be unchanged. // The callback can re-use data and items for storing the result. type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte) @@ -58,6 +60,11 @@ type blockStreamMerger struct { ib inmemoryBlock phFirstItemCaught bool + + // This are auxiliary buffers used in flushIB + // for consistency checks after prepareBlock call. + firstItem []byte + lastItem []byte } func (bsm *blockStreamMerger) reset() { @@ -117,14 +124,17 @@ again: nextItem = bsm.bsrHeap[0].bh.firstItem hasNextItem = true } - for bsr.blockItemIdx < len(bsr.Block.items) && (!hasNextItem || string(bsr.Block.items[bsr.blockItemIdx]) <= string(nextItem)) { - if bsm.ib.Add(bsr.Block.items[bsr.blockItemIdx]) { - bsr.blockItemIdx++ + for bsr.blockItemIdx < len(bsr.Block.items) { + item := bsr.Block.items[bsr.blockItemIdx] + if hasNextItem && string(item) > string(nextItem) { + break + } + if !bsm.ib.Add(item) { + // The bsm.ib is full. Flush it to bsw and continue. + bsm.flushIB(bsw, ph, itemsMerged) continue } - - // The bsm.ib is full. Flush it to bsw and continue. - bsm.flushIB(bsw, ph, itemsMerged) + bsr.blockItemIdx++ } if bsr.blockItemIdx == len(bsr.Block.items) { // bsr.Block is fully read. Proceed to the next block. @@ -152,7 +162,27 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it } atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items))) if bsm.prepareBlock != nil { + bsm.firstItem = append(bsm.firstItem[:0], bsm.ib.items[0]...) + bsm.lastItem = append(bsm.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...) bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items) + if len(bsm.ib.items) == 0 { + // Nothing to flush + return + } + // Consistency checks after prepareBlock call. + firstItem := bsm.ib.items[0] + if string(firstItem) != string(bsm.firstItem) { + logger.Panicf("BUG: prepareBlock must return first item equal to the original first item;\ngot\n%X\nwant\n%X", firstItem, bsm.firstItem) + } + lastItem := bsm.ib.items[len(bsm.ib.items)-1] + if string(lastItem) != string(bsm.lastItem) { + logger.Panicf("BUG: prepareBlock must return last item equal to the original last item;\ngot\n%X\nwant\n%X", lastItem, bsm.lastItem) + } + // Verify whether the bsm.ib.items are sorted only in tests, since this + // can be expensive check in prod for items with long common prefix. + if isInTest && !bsm.ib.isSorted() { + logger.Panicf("BUG: prepareBlock must return sorted items;\ngot\n%s", bsm.ib.debugItemsString()) + } } ph.itemsCount += uint64(len(bsm.ib.items)) if !bsm.phFirstItemCaught { diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index db78034af..de5408d5c 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1639,11 +1639,11 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, // Fast path: the same tag value found. // There is no need in checking it again with potentially // slow tf.matchSuffix, which may call regexp. - mp.ParseMetricIDs() - loops += len(mp.MetricIDs) + loops += mp.MetricIDsLen() if loops > maxLoops { return errFallbackToMetricNameMatch } + mp.ParseMetricIDs() for _, metricID := range mp.MetricIDs { if !f(metricID) { return nil @@ -1672,11 +1672,11 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, } prevMatch = true prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...) - mp.ParseMetricIDs() - loops += len(mp.MetricIDs) + loops += mp.MetricIDsLen() if loops > maxLoops { return errFallbackToMetricNameMatch } + mp.ParseMetricIDs() for _, metricID := range mp.MetricIDs { if !f(metricID) { return nil @@ -1739,11 +1739,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil { return err } - mp.ParseMetricIDs() - loops += len(mp.MetricIDs) + loops += mp.MetricIDsLen() if loops > maxLoops { return errFallbackToMetricNameMatch } + mp.ParseMetricIDs() for _, metricID := range mp.MetricIDs { metricIDs[metricID] = struct{}{} } @@ -1788,11 +1788,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri return nil } sf = sortedFilter - mp.ParseMetricIDs() - loops += len(mp.MetricIDs) + loops += mp.MetricIDsLen() if loops > maxLoops { return errFallbackToMetricNameMatch } + mp.ParseMetricIDs() for _, metricID = range mp.MetricIDs { if len(sf) == 0 { break @@ -2185,6 +2185,11 @@ func (mp *tagToMetricIDsRowParser) FirstAndLastMetricIDs() (uint64, uint64) { return firstMetricID, lastMetricID } +// MetricIDsLen returns the number of MetricIDs in the mp.tail +func (mp *tagToMetricIDsRowParser) MetricIDsLen() int { + return len(mp.tail) / 8 +} + // ParseMetricIDs parses MetricIDs from mp.tail into mp.MetricIDs. func (mp *tagToMetricIDsRowParser) ParseMetricIDs() { tail := mp.tail @@ -2242,14 +2247,14 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { // items contain at least one tag->metricIDs row. Merge rows with common tag. dstData := data[:0] dstItems := items[:0] - tmm := getTagToMetricIDsRowsMerger() - defer putTagToMetricIDsRowsMerger(tmm) - mp := &tmm.mp mpPrev := &tmm.mpPrev - for _, item := range items { - if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs { + for i, item := range items { + if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs || i == 0 || i == len(items)-1 { + // Write rows other than tag->metricIDs as-is. + // Additionally write the first and the last row as-is in order to preserve + // sort order for adjancent blocks. if len(tmm.pendingMetricIDs) > 0 { dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) } @@ -2270,7 +2275,8 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { if len(tmm.pendingMetricIDs) > 0 { dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) } - return dstData, dstItems + putTagToMetricIDsRowsMerger(tmm) + return data, dstItems } type uint64Sorter []uint64 @@ -2289,34 +2295,27 @@ type tagToMetricIDsRowsMerger struct { mpPrev tagToMetricIDsRowParser } +func (tmm *tagToMetricIDsRowsMerger) Reset() { + tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] + tmm.mp.Reset() + tmm.mpPrev.Reset() +} + func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) { if len(tmm.pendingMetricIDs) == 0 { logger.Panicf("BUG: pendingMetricIDs must be non-empty") } + // Use sort.Sort instead of sort.Slice in order to reduce memory allocations. + sort.Sort(&tmm.pendingMetricIDs) + dstDataLen := len(dstData) dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs) dstData = mp.Tag.Marshal(dstData) - // Use sort.Sort instead of sort.Slice in order to reduce memory allocations - sort.Sort(&tmm.pendingMetricIDs) - pendingMetricIDs := tmm.pendingMetricIDs - if len(dstItems) == 0 { - // Put the first item with a single metricID, since this item goes into index, so it must be short. - dstData = encoding.MarshalUint64(dstData, pendingMetricIDs[0]) - dstItems = append(dstItems, dstData[dstDataLen:]) - pendingMetricIDs = pendingMetricIDs[1:] - if len(pendingMetricIDs) == 0 { - tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] - return dstData, dstItems - } - dstDataLen = len(dstData) - dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs) - dstData = mp.Tag.Marshal(dstData) - } - for _, metricID := range pendingMetricIDs { + for _, metricID := range tmm.pendingMetricIDs { dstData = encoding.MarshalUint64(dstData, metricID) } - tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] dstItems = append(dstItems, dstData[dstDataLen:]) + tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] return dstData, dstItems } @@ -2329,9 +2328,7 @@ func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger { } func putTagToMetricIDsRowsMerger(tmm *tagToMetricIDsRowsMerger) { - tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0] - tmm.mp.Reset() - tmm.mpPrev.Reset() + tmm.Reset() tmmPool.Put(tmm) } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 744efe899..67979dbe8 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -280,6 +280,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, is := db.getIndexSearch() defer db.putIndexSearch(is) + var metricNameBuf []byte for i := 0; i < 4e2+1; i++ { var mn MetricName @@ -294,11 +295,11 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, mn.AddTag(key, value) } mn.sortTags() - metricName := mn.Marshal(nil) + metricNameBuf = mn.Marshal(metricNameBuf[:0]) // Create tsid for the metricName. var tsid TSID - if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil { + if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil { return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err) } @@ -306,22 +307,22 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount, tsids = append(tsids, tsid) } + // fill Date -> MetricID cache + date := uint64(timestampFromTime(time.Now())) / msecPerDay + for i := range tsids { + tsid := &tsids[i] + if err := db.storeDateMetricID(date, tsid.MetricID); err != nil { + return nil, nil, fmt.Errorf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err) + } + } + + // Flush index to disk, so it becomes visible for search db.tb.DebugFlush() return mns, tsids, nil } func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isConcurrent bool) error { - // fill Date -> MetricID cache - date := uint64(timestampFromTime(time.Now())) / msecPerDay - for i := range tsids { - tsid := &tsids[i] - if err := db.storeDateMetricID(date, tsid.MetricID); err != nil { - return fmt.Errorf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err) - } - } - db.tb.DebugFlush() - hasValue := func(tvs []string, v []byte) bool { for _, tv := range tvs { if string(v) == tv { @@ -361,7 +362,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC var err error metricNameCopy, err = db.searchMetricName(metricNameCopy[:0], tsidCopy.MetricID) if err != nil { - return fmt.Errorf("error in searchMetricName: %s", err) + return fmt.Errorf("error in searchMetricName for metricID=%d; i=%d: %s", tsidCopy.MetricID, i, err) } if !bytes.Equal(metricName, metricNameCopy) { return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName) @@ -451,7 +452,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC return fmt.Errorf("cannot search by exact tag filter: %s", err) } if !testHasTSID(tsidsFound, tsid) { - return fmt.Errorf("tsids is missing in exact tsidsFound\ntsid=%+v\ntsidsFound=%+v\ntfs=%s\nmn=%s", tsid, tsidsFound, tfs, mn) + return fmt.Errorf("tsids is missing in exact tsidsFound\ntsid=%+v\ntsidsFound=%+v\ntfs=%s\nmn=%s\ni=%d", tsid, tsidsFound, tfs, mn, i) } // Verify tag cache.