mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: add tests for mergeTagToMetricIDsRows and return the original items if the function breaks items` ordering.
This should save from data corruption issues revealed in the previous releases up to v1.28.0-beta5.
This commit is contained in:
parent
956fdd89d3
commit
c1cf7d9f93
2 changed files with 326 additions and 8 deletions
|
@ -2229,7 +2229,8 @@ func (mp *tagToMetricIDsRowParser) IsDeletedTag(dmis *uint64set.Set) bool {
|
|||
func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
|
||||
// Perform quick checks whether items contain tag->metricIDs rows
|
||||
// based on the fact that items are sorted.
|
||||
if len(items) == 0 {
|
||||
if len(items) <= 2 {
|
||||
// The first and the last row must remain unchanged.
|
||||
return data, items
|
||||
}
|
||||
firstItem := items[0]
|
||||
|
@ -2242,11 +2243,13 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
|
|||
}
|
||||
|
||||
// items contain at least one tag->metricIDs row. Merge rows with common tag.
|
||||
dstData := data[:0]
|
||||
dstItems := items[:0]
|
||||
tmm := getTagToMetricIDsRowsMerger()
|
||||
tmm.dataCopy = append(tmm.dataCopy[:0], data...)
|
||||
tmm.itemsCopy = append(tmm.itemsCopy[:0], items...)
|
||||
mp := &tmm.mp
|
||||
mpPrev := &tmm.mpPrev
|
||||
dstData := data[:0]
|
||||
dstItems := items[:0]
|
||||
for i, item := range items {
|
||||
if len(item) == 0 || item[0] != nsPrefixTagToMetricIDs || i == 0 || i == len(items)-1 {
|
||||
// Write rows other than tag->metricIDs as-is.
|
||||
|
@ -2281,24 +2284,29 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
|
|||
}
|
||||
}
|
||||
if len(tmm.pendingMetricIDs) > 0 {
|
||||
dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
|
||||
logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs)
|
||||
}
|
||||
if err := checkItemsSorted(dstItems); err != nil {
|
||||
logger.Errorf("please report this error at https://github.com/VictoriaMetrics/VictoriaMetrics/issues : %s", err)
|
||||
dstData = append(dstData[:0], tmm.dataCopy...)
|
||||
dstItems = append(dstItems[:0], tmm.itemsCopy...)
|
||||
}
|
||||
putTagToMetricIDsRowsMerger(tmm)
|
||||
assertItemsSorted(dstItems)
|
||||
return dstData, dstItems
|
||||
}
|
||||
|
||||
func assertItemsSorted(items [][]byte) {
|
||||
func checkItemsSorted(items [][]byte) error {
|
||||
if len(items) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
prevItem := items[0]
|
||||
for _, currItem := range items[1:] {
|
||||
if string(prevItem) > string(currItem) {
|
||||
logger.Panicf("BUG: items aren't sorted: prevItem > currItem\nprevItem=%X\ncurrItem=%X\nitems=%X", prevItem, currItem, items)
|
||||
return fmt.Errorf("items aren't sorted: prevItem > currItem\nprevItem=%X\ncurrItem=%X\nitems=%X", prevItem, currItem, items)
|
||||
}
|
||||
prevItem = currItem
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// maxMetricIDsPerRow limits the number of metricIDs in tag->metricIDs row.
|
||||
|
@ -2320,12 +2328,18 @@ type tagToMetricIDsRowsMerger struct {
|
|||
pendingMetricIDs uint64Sorter
|
||||
mp tagToMetricIDsRowParser
|
||||
mpPrev tagToMetricIDsRowParser
|
||||
|
||||
itemsCopy [][]byte
|
||||
dataCopy []byte
|
||||
}
|
||||
|
||||
func (tmm *tagToMetricIDsRowsMerger) Reset() {
|
||||
tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
|
||||
tmm.mp.Reset()
|
||||
tmm.mpPrev.Reset()
|
||||
|
||||
tmm.itemsCopy = tmm.itemsCopy[:0]
|
||||
tmm.dataCopy = tmm.dataCopy[:0]
|
||||
}
|
||||
|
||||
func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems [][]byte, mp *tagToMetricIDsRowParser) ([]byte, [][]byte) {
|
||||
|
|
|
@ -12,11 +12,315 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||
)
|
||||
|
||||
func TestMergeTagToMetricIDsRows(t *testing.T) {
|
||||
f := func(items []string, expectedItems []string) {
|
||||
t.Helper()
|
||||
var data []byte
|
||||
var itemsB [][]byte
|
||||
for _, item := range items {
|
||||
data = append(data, item...)
|
||||
itemsB = append(itemsB, data[len(data)-len(item):])
|
||||
}
|
||||
if err := checkItemsSorted(itemsB); err != nil {
|
||||
t.Fatalf("source items aren't sorted: %s", err)
|
||||
}
|
||||
resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB)
|
||||
if len(resultItemsB) != len(expectedItems) {
|
||||
t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems))
|
||||
}
|
||||
if err := checkItemsSorted(resultItemsB); err != nil {
|
||||
t.Fatalf("result items aren't sorted: %s", err)
|
||||
}
|
||||
for i, item := range resultItemsB {
|
||||
if !bytes.HasPrefix(resultData, item) {
|
||||
t.Fatalf("unexpected prefix for resultData #%d;\ngot\n%X\nwant\n%X", i, resultData, item)
|
||||
}
|
||||
resultData = resultData[len(item):]
|
||||
}
|
||||
if len(resultData) != 0 {
|
||||
t.Fatalf("unexpected tail left in resultData: %X", resultData)
|
||||
}
|
||||
var resultItems []string
|
||||
for _, item := range resultItemsB {
|
||||
resultItems = append(resultItems, string(item))
|
||||
}
|
||||
if !reflect.DeepEqual(expectedItems, resultItems) {
|
||||
t.Fatalf("unexpected items;\ngot\n%X\nwant\n%X", resultItems, expectedItems)
|
||||
}
|
||||
}
|
||||
x := func(key, value string, metricIDs []uint64) string {
|
||||
dst := marshalCommonPrefix(nil, nsPrefixTagToMetricIDs)
|
||||
t := &Tag{
|
||||
Key: []byte(key),
|
||||
Value: []byte(value),
|
||||
}
|
||||
dst = t.Marshal(dst)
|
||||
for _, metricID := range metricIDs {
|
||||
dst = encoding.MarshalUint64(dst, metricID)
|
||||
}
|
||||
return string(dst)
|
||||
}
|
||||
|
||||
f(nil, nil)
|
||||
f([]string{}, nil)
|
||||
f([]string{"foo"}, []string{"foo"})
|
||||
f([]string{"a", "b", "c", "def"}, []string{"a", "b", "c", "def"})
|
||||
f([]string{"\x00", "\x00b", "\x00c", "\x00def"}, []string{"\x00", "\x00b", "\x00c", "\x00def"})
|
||||
f([]string{
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
}, []string{
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
})
|
||||
f([]string{
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
"xyz",
|
||||
}, []string{
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
"xyz",
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
x("", "", []uint64{0}),
|
||||
"xyz",
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{0}),
|
||||
"xyz",
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1}),
|
||||
x("", "", []uint64{2}),
|
||||
x("", "", []uint64{3}),
|
||||
x("", "", []uint64{4}),
|
||||
"xyz",
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1, 2, 3, 4}),
|
||||
"xyz",
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1}),
|
||||
x("", "", []uint64{2}),
|
||||
x("", "", []uint64{3}),
|
||||
x("", "", []uint64{4}),
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1, 2, 3}),
|
||||
x("", "", []uint64{4}),
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1}),
|
||||
x("", "", []uint64{2, 3, 4}),
|
||||
x("", "", []uint64{2, 3, 4, 5}),
|
||||
x("", "", []uint64{3, 5}),
|
||||
"foo",
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1, 2, 3, 4, 5}),
|
||||
"foo",
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1}),
|
||||
x("", "a", []uint64{2, 3, 4}),
|
||||
x("", "a", []uint64{2, 3, 4, 5}),
|
||||
x("", "b", []uint64{3, 5}),
|
||||
"foo",
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1}),
|
||||
x("", "a", []uint64{2, 3, 4, 5}),
|
||||
x("", "b", []uint64{3, 5}),
|
||||
"foo",
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1}),
|
||||
x("x", "a", []uint64{2, 3, 4}),
|
||||
x("y", "", []uint64{2, 3, 4, 5}),
|
||||
x("y", "x", []uint64{3, 5}),
|
||||
"foo",
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("", "", []uint64{1}),
|
||||
x("x", "a", []uint64{2, 3, 4}),
|
||||
x("y", "", []uint64{2, 3, 4, 5}),
|
||||
x("y", "x", []uint64{3, 5}),
|
||||
"foo",
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("sdf", "aa", []uint64{1, 1, 3}),
|
||||
x("sdf", "aa", []uint64{1, 2}),
|
||||
"foo",
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("sdf", "aa", []uint64{1, 2, 3}),
|
||||
"foo",
|
||||
})
|
||||
f([]string{
|
||||
"\x00asdf",
|
||||
x("sdf", "aa", []uint64{1, 2, 2, 4}),
|
||||
x("sdf", "aa", []uint64{1, 2, 3}),
|
||||
"foo",
|
||||
}, []string{
|
||||
"\x00asdf",
|
||||
x("sdf", "aa", []uint64{1, 2, 3, 4}),
|
||||
"foo",
|
||||
})
|
||||
|
||||
// Construct big source chunks
|
||||
var metricIDs []uint64
|
||||
|
||||
metricIDs = metricIDs[:0]
|
||||
for i := 0; i < maxMetricIDsPerRow-1; i++ {
|
||||
metricIDs = append(metricIDs, uint64(i))
|
||||
}
|
||||
f([]string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
}, []string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
})
|
||||
|
||||
metricIDs = metricIDs[:0]
|
||||
for i := 0; i < maxMetricIDsPerRow; i++ {
|
||||
metricIDs = append(metricIDs, uint64(i))
|
||||
}
|
||||
f([]string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
}, []string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
})
|
||||
|
||||
metricIDs = metricIDs[:0]
|
||||
for i := 0; i < 3*maxMetricIDsPerRow; i++ {
|
||||
metricIDs = append(metricIDs, uint64(i))
|
||||
}
|
||||
f([]string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
}, []string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
})
|
||||
f([]string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", []uint64{0, 0, 1, 2, 3}),
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
}, []string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", []uint64{0, 1, 2, 3}),
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
})
|
||||
|
||||
// Check for duplicate metricIDs removal
|
||||
metricIDs = metricIDs[:0]
|
||||
for i := 0; i < maxMetricIDsPerRow-1; i++ {
|
||||
metricIDs = append(metricIDs, 123)
|
||||
}
|
||||
f([]string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", metricIDs),
|
||||
"x",
|
||||
}, []string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", []uint64{123}),
|
||||
"x",
|
||||
})
|
||||
|
||||
// Check fallback to the original items after merging, which result in incorrect ordering.
|
||||
metricIDs = metricIDs[:0]
|
||||
for i := 0; i < maxMetricIDsPerRow-3; i++ {
|
||||
metricIDs = append(metricIDs, uint64(123))
|
||||
}
|
||||
f([]string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", []uint64{123, 123, 125}),
|
||||
x("foo", "bar", []uint64{123, 124}),
|
||||
"x",
|
||||
}, []string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", []uint64{123, 123, 125}),
|
||||
x("foo", "bar", []uint64{123, 124}),
|
||||
"x",
|
||||
})
|
||||
f([]string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", []uint64{123, 123, 125}),
|
||||
x("foo", "bar", []uint64{123, 124}),
|
||||
}, []string{
|
||||
"\x00aa",
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", []uint64{123, 123, 125}),
|
||||
x("foo", "bar", []uint64{123, 124}),
|
||||
})
|
||||
f([]string{
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", []uint64{123, 123, 125}),
|
||||
x("foo", "bar", []uint64{123, 124}),
|
||||
}, []string{
|
||||
x("foo", "bar", metricIDs),
|
||||
x("foo", "bar", []uint64{123, 123, 125}),
|
||||
x("foo", "bar", []uint64{123, 124}),
|
||||
})
|
||||
}
|
||||
|
||||
func TestRemoveDuplicateMetricIDs(t *testing.T) {
|
||||
f := func(metricIDs, expectedMetricIDs []uint64) {
|
||||
t.Helper()
|
||||
a := removeDuplicateMetricIDs(metricIDs)
|
||||
if !reflect.DeepEqual(a, expectedMetricIDs) {
|
||||
t.Fatalf("unexpected result from removeDuplicateMetricIDs:\ngot\n%d\nwant\n%d", a, expectedMetricIDs)
|
||||
|
|
Loading…
Reference in a new issue