lib/{storage,mergeset}: verify PrepareBlock callback results

Do not touch the first and the last item passed to PrepareBlock
in order to preserve sort order of mergeset blocks.
This commit is contained in:
Aliaksandr Valialkin 2019-09-23 20:40:38 +03:00
parent 0772191975
commit 4e26ad869b
4 changed files with 89 additions and 56 deletions

View file

@ -168,8 +168,13 @@ func (ib *inmemoryBlock) MarshalSortedData(sb *storageBlock, firstItemDst, commo
func (ib *inmemoryBlock) debugItemsString() string {
var sb strings.Builder
var prevItem []byte
for i, item := range ib.items {
if string(item) < string(prevItem) {
fmt.Fprintf(&sb, "!!! the next item is smaller than the previous item !!!\n")
}
fmt.Fprintf(&sb, "%05d %X\n", i, item)
prevItem = item
}
return sb.String()
}

View file

@ -5,6 +5,8 @@ import (
"fmt"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// PrepareBlockCallback can transform the passed items allocated at the given data.
@ -12,7 +14,7 @@ import (
// The callback is called during merge before flushing full block of the given items
// to persistent storage.
//
// The callback must return sorted items.
// The callback must return sorted items. The first and the last item must be unchanged.
// The callback can re-use data and items for storing the result.
type PrepareBlockCallback func(data []byte, items [][]byte) ([]byte, [][]byte)
@ -58,6 +60,11 @@ type blockStreamMerger struct {
ib inmemoryBlock
phFirstItemCaught bool
// This are auxiliary buffers used in flushIB
// for consistency checks after prepareBlock call.
firstItem []byte
lastItem []byte
}
func (bsm *blockStreamMerger) reset() {
@ -117,14 +124,17 @@ again:
nextItem = bsm.bsrHeap[0].bh.firstItem
hasNextItem = true
}
for bsr.blockItemIdx < len(bsr.Block.items) && (!hasNextItem || string(bsr.Block.items[bsr.blockItemIdx]) <= string(nextItem)) {
if bsm.ib.Add(bsr.Block.items[bsr.blockItemIdx]) {
bsr.blockItemIdx++
for bsr.blockItemIdx < len(bsr.Block.items) {
item := bsr.Block.items[bsr.blockItemIdx]
if hasNextItem && string(item) > string(nextItem) {
break
}
if !bsm.ib.Add(item) {
// The bsm.ib is full. Flush it to bsw and continue.
bsm.flushIB(bsw, ph, itemsMerged)
continue
}
// The bsm.ib is full. Flush it to bsw and continue.
bsm.flushIB(bsw, ph, itemsMerged)
bsr.blockItemIdx++
}
if bsr.blockItemIdx == len(bsr.Block.items) {
// bsr.Block is fully read. Proceed to the next block.
@ -152,7 +162,27 @@ func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, it
}
atomic.AddUint64(itemsMerged, uint64(len(bsm.ib.items)))
if bsm.prepareBlock != nil {
bsm.firstItem = append(bsm.firstItem[:0], bsm.ib.items[0]...)
bsm.lastItem = append(bsm.lastItem[:0], bsm.ib.items[len(bsm.ib.items)-1]...)
bsm.ib.data, bsm.ib.items = bsm.prepareBlock(bsm.ib.data, bsm.ib.items)
if len(bsm.ib.items) == 0 {
// Nothing to flush
return
}
// Consistency checks after prepareBlock call.
firstItem := bsm.ib.items[0]
if string(firstItem) != string(bsm.firstItem) {
logger.Panicf("BUG: prepareBlock must return first item equal to the original first item;\ngot\n%X\nwant\n%X", firstItem, bsm.firstItem)
}
lastItem := bsm.ib.items[len(bsm.ib.items)-1]
if string(lastItem) != string(bsm.lastItem) {
logger.Panicf("BUG: prepareBlock must return last item equal to the original last item;\ngot\n%X\nwant\n%X", lastItem, bsm.lastItem)
}
// Verify whether the bsm.ib.items are sorted only in tests, since this
// can be expensive check in prod for items with long common prefix.
if isInTest && !bsm.ib.isSorted() {
logger.Panicf("BUG: prepareBlock must return sorted items;\ngot\n%s", bsm.ib.debugItemsString())
}
}
ph.itemsCount += uint64(len(bsm.ib.items))
if !bsm.phFirstItemCaught {

View file

@ -1639,11 +1639,11 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
// Fast path: the same tag value found.
// There is no need in checking it again with potentially
// slow tf.matchSuffix, which may call regexp.
mp.ParseMetricIDs()
loops += len(mp.MetricIDs)
loops += mp.MetricIDsLen()
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
if !f(metricID) {
return nil
@ -1672,11 +1672,11 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
}
prevMatch = true
prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...)
mp.ParseMetricIDs()
loops += len(mp.MetricIDs)
loops += mp.MetricIDsLen()
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
if !f(metricID) {
return nil
@ -1739,11 +1739,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
return err
}
mp.ParseMetricIDs()
loops += len(mp.MetricIDs)
loops += mp.MetricIDsLen()
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
metricIDs[metricID] = struct{}{}
}
@ -1788,11 +1788,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
return nil
}
sf = sortedFilter
mp.ParseMetricIDs()
loops += len(mp.MetricIDs)
loops += mp.MetricIDsLen()
if loops > maxLoops {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID = range mp.MetricIDs {
if len(sf) == 0 {
break
@ -2185,6 +2185,11 @@ func (mp *tagToMetricIDsRowParser) FirstAndLastMetricIDs() (uint64, uint64) {
return firstMetricID, lastMetricID
}
// MetricIDsLen returns the number of MetricIDs in the mp.tail
func (mp *tagToMetricIDsRowParser) MetricIDsLen() int {
return len(mp.tail) / 8
}
// ParseMetricIDs parses MetricIDs from mp.tail into mp.MetricIDs.
func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
tail := mp.tail
@ -2242,14 +2247,14 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
// items contain at least one tag->metricIDs row. Merge rows with common tag.
dstData := data[:0]
dstItems := items[:0]
tmm := getTagToMetricIDsRowsMerger()
defer putTagToMetricIDsRowsMerger(tmm)
mp := &tmm.mp
mpPrev := &tmm.mpPrev
for _, item := range items {
if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs {
for i, item := range items {
if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs || i == 0 || i == len(items)-1 {
// Write rows other than tag->metricIDs as-is.
// Additionally write the first and the last row as-is in order to preserve
// sort order for adjancent blocks.
if len(tmm.pendingMetricIDs) > 0 {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
@ -2270,7 +2275,8 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
if len(tmm.pendingMetricIDs) > 0 {
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
}
return dstData, dstItems
putTagToMetricIDsRowsMerger(tmm)
return data, dstItems
}
type uint64Sorter []uint64
@ -2289,34 +2295,27 @@ type tagToMetricIDsRowsMerger struct {
mpPrev tagToMetricIDsRowParser
}
func (tmm *tagToMetricIDsRowsMerger) Reset() {
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
tmm.mp.Reset()
tmm.mpPrev.Reset()
}
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) {
if len(tmm.pendingMetricIDs) == 0 {
logger.Panicf("BUG: pendingMetricIDs must be non-empty")
}
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
sort.Sort(&tmm.pendingMetricIDs)
dstDataLen := len(dstData)
dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs)
dstData = mp.Tag.Marshal(dstData)
// Use sort.Sort instead of sort.Slice in order to reduce memory allocations
sort.Sort(&tmm.pendingMetricIDs)
pendingMetricIDs := tmm.pendingMetricIDs
if len(dstItems) == 0 {
// Put the first item with a single metricID, since this item goes into index, so it must be short.
dstData = encoding.MarshalUint64(dstData, pendingMetricIDs[0])
dstItems = append(dstItems, dstData[dstDataLen:])
pendingMetricIDs = pendingMetricIDs[1:]
if len(pendingMetricIDs) == 0 {
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
return dstData, dstItems
}
dstDataLen = len(dstData)
dstData = marshalCommonPrefix(dstData, nsPrefixTagToMetricIDs)
dstData = mp.Tag.Marshal(dstData)
}
for _, metricID := range pendingMetricIDs {
for _, metricID := range tmm.pendingMetricIDs {
dstData = encoding.MarshalUint64(dstData, metricID)
}
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
dstItems = append(dstItems, dstData[dstDataLen:])
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
return dstData, dstItems
}
@ -2329,9 +2328,7 @@ func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger {
}
func putTagToMetricIDsRowsMerger(tmm *tagToMetricIDsRowsMerger) {
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
tmm.mp.Reset()
tmm.mpPrev.Reset()
tmm.Reset()
tmmPool.Put(tmm)
}

View file

@ -280,6 +280,7 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
is := db.getIndexSearch()
defer db.putIndexSearch(is)
var metricNameBuf []byte
for i := 0; i < 4e2+1; i++ {
var mn MetricName
@ -294,11 +295,11 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
mn.AddTag(key, value)
}
mn.sortTags()
metricName := mn.Marshal(nil)
metricNameBuf = mn.Marshal(metricNameBuf[:0])
// Create tsid for the metricName.
var tsid TSID
if err := is.GetOrCreateTSIDByName(&tsid, metricName); err != nil {
if err := is.GetOrCreateTSIDByName(&tsid, metricNameBuf); err != nil {
return nil, nil, fmt.Errorf("unexpected error when creating tsid for mn:\n%s: %s", &mn, err)
}
@ -306,22 +307,22 @@ func testIndexDBGetOrCreateTSIDByName(db *indexDB, accountsCount, projectsCount,
tsids = append(tsids, tsid)
}
// fill Date -> MetricID cache
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
if err := db.storeDateMetricID(date, tsid.MetricID); err != nil {
return nil, nil, fmt.Errorf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
}
}
// Flush index to disk, so it becomes visible for search
db.tb.DebugFlush()
return mns, tsids, nil
}
func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isConcurrent bool) error {
// fill Date -> MetricID cache
date := uint64(timestampFromTime(time.Now())) / msecPerDay
for i := range tsids {
tsid := &tsids[i]
if err := db.storeDateMetricID(date, tsid.MetricID); err != nil {
return fmt.Errorf("error in storeDateMetricID(%d, %d): %s", date, tsid.MetricID, err)
}
}
db.tb.DebugFlush()
hasValue := func(tvs []string, v []byte) bool {
for _, tv := range tvs {
if string(v) == tv {
@ -361,7 +362,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
var err error
metricNameCopy, err = db.searchMetricName(metricNameCopy[:0], tsidCopy.MetricID)
if err != nil {
return fmt.Errorf("error in searchMetricName: %s", err)
return fmt.Errorf("error in searchMetricName for metricID=%d; i=%d: %s", tsidCopy.MetricID, i, err)
}
if !bytes.Equal(metricName, metricNameCopy) {
return fmt.Errorf("unexpected mn for metricID=%d;\ngot\n%q\nwant\n%q", tsidCopy.MetricID, metricNameCopy, metricName)
@ -451,7 +452,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return fmt.Errorf("cannot search by exact tag filter: %s", err)
}
if !testHasTSID(tsidsFound, tsid) {
return fmt.Errorf("tsids is missing in exact tsidsFound\ntsid=%+v\ntsidsFound=%+v\ntfs=%s\nmn=%s", tsid, tsidsFound, tfs, mn)
return fmt.Errorf("tsids is missing in exact tsidsFound\ntsid=%+v\ntsidsFound=%+v\ntfs=%s\nmn=%s\ni=%d", tsid, tsidsFound, tfs, mn, i)
}
// Verify tag cache.