mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
c0b852d50d
Callers of InitFromFilePart log the error and exit. It is better to log the error with the path to the part and the call stack directly inside the MustInitFromFilePart() function. This simplifies the code at callers' side while leaving the same level of debuggability.
475 lines
13 KiB
Go
475 lines
13 KiB
Go
package storage
|
|
|
|
import (
|
|
"errors"
|
|
"math/rand"
|
|
"testing"
|
|
)
|
|
|
|
func TestMergeBlockStreamsOneStreamOneRow(t *testing.T) {
|
|
rows := []rawRow{
|
|
{
|
|
Timestamp: 82394327423432,
|
|
Value: 123.42389,
|
|
PrecisionBits: defaultPrecisionBits,
|
|
},
|
|
}
|
|
bsr := newTestBlockStreamReader(t, rows)
|
|
bsrs := []*blockStreamReader{bsr}
|
|
testMergeBlockStreams(t, bsrs, 1, 1, rows[0].Timestamp, rows[0].Timestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsOneStreamOneBlockManyRows(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = 4
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
for i := 0; i < maxRowsPerBlock; i++ {
|
|
r.Timestamp = int64(rng.Intn(1e9))
|
|
r.Value = rng.NormFloat64() * 2332
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr := newTestBlockStreamReader(t, rows)
|
|
bsrs := []*blockStreamReader{bsr}
|
|
testMergeBlockStreams(t, bsrs, 1, maxRowsPerBlock, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsOneStreamManyBlocksOneRow(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = 4
|
|
const blocksCount = 1234
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
for i := 0; i < blocksCount; i++ {
|
|
initTestTSID(&r.TSID)
|
|
r.TSID.MetricID = uint64(i * 123)
|
|
r.Timestamp = int64(rng.Intn(1e9))
|
|
r.Value = rng.NormFloat64() * 2332
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr := newTestBlockStreamReader(t, rows)
|
|
bsrs := []*blockStreamReader{bsr}
|
|
testMergeBlockStreams(t, bsrs, blocksCount, blocksCount, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsOneStreamManyBlocksManyRows(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
initTestTSID(&r.TSID)
|
|
r.PrecisionBits = 4
|
|
const blocksCount = 1234
|
|
const rowsCount = 4938
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
for i := 0; i < rowsCount; i++ {
|
|
r.TSID.MetricID = uint64(i % blocksCount)
|
|
r.Timestamp = int64(rng.Intn(1e9))
|
|
r.Value = rng.NormFloat64() * 2332
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr := newTestBlockStreamReader(t, rows)
|
|
bsrs := []*blockStreamReader{bsr}
|
|
testMergeBlockStreams(t, bsrs, blocksCount, rowsCount, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsTwoStreamsOneBlockTwoRows(t *testing.T) {
|
|
// Identical rows
|
|
rows := []rawRow{
|
|
{
|
|
Timestamp: 182394327423432,
|
|
Value: 3123.42389,
|
|
PrecisionBits: defaultPrecisionBits,
|
|
},
|
|
}
|
|
bsr1 := newTestBlockStreamReader(t, rows)
|
|
bsr2 := newTestBlockStreamReader(t, rows)
|
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
|
testMergeBlockStreams(t, bsrs, 1, 2, rows[0].Timestamp, rows[0].Timestamp)
|
|
|
|
// Distinct rows for the same TSID.
|
|
minTimestamp := int64(12332443)
|
|
maxTimestamp := int64(23849834543)
|
|
rows = []rawRow{
|
|
{
|
|
Timestamp: maxTimestamp,
|
|
Value: 3123.42389,
|
|
PrecisionBits: defaultPrecisionBits,
|
|
},
|
|
}
|
|
bsr1 = newTestBlockStreamReader(t, rows)
|
|
rows = []rawRow{
|
|
{
|
|
Timestamp: minTimestamp,
|
|
Value: 23.42389,
|
|
PrecisionBits: defaultPrecisionBits,
|
|
},
|
|
}
|
|
bsr2 = newTestBlockStreamReader(t, rows)
|
|
bsrs = []*blockStreamReader{bsr1, bsr2}
|
|
testMergeBlockStreams(t, bsrs, 1, 2, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsTwoStreamsTwoBlocksOneRow(t *testing.T) {
|
|
minTimestamp := int64(4389345)
|
|
maxTimestamp := int64(8394584354)
|
|
|
|
rows := []rawRow{
|
|
{
|
|
TSID: TSID{
|
|
MetricID: 8454,
|
|
},
|
|
Timestamp: minTimestamp,
|
|
Value: 33.42389,
|
|
PrecisionBits: defaultPrecisionBits,
|
|
},
|
|
}
|
|
bsr1 := newTestBlockStreamReader(t, rows)
|
|
|
|
rows = []rawRow{
|
|
{
|
|
TSID: TSID{
|
|
MetricID: 4454,
|
|
},
|
|
Timestamp: maxTimestamp,
|
|
Value: 323.42389,
|
|
PrecisionBits: defaultPrecisionBits,
|
|
},
|
|
}
|
|
bsr2 := newTestBlockStreamReader(t, rows)
|
|
|
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
|
testMergeBlockStreams(t, bsrs, 2, 2, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
|
|
const blocksCount = 1234
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
initTestTSID(&r.TSID)
|
|
r.PrecisionBits = 2
|
|
const rowsCount1 = 4938
|
|
for i := 0; i < rowsCount1; i++ {
|
|
r.TSID.MetricID = uint64(i % blocksCount)
|
|
r.Timestamp = int64(rng.Intn(1e9))
|
|
r.Value = rng.NormFloat64() * 2332
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr1 := newTestBlockStreamReader(t, rows)
|
|
|
|
rows = rows[:0]
|
|
const rowsCount2 = 3281
|
|
for i := 0; i < rowsCount2; i++ {
|
|
r.TSID.MetricID = uint64((i + 17) % blocksCount)
|
|
r.Timestamp = int64(rng.Intn(1e9))
|
|
r.Value = rng.NormFloat64() * 2332
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr2 := newTestBlockStreamReader(t, rows)
|
|
|
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
|
testMergeBlockStreams(t, bsrs, blocksCount, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsTwoStreamsBigOverlappingBlocks(t *testing.T) {
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = 5
|
|
const rowsCount1 = maxRowsPerBlock + 234
|
|
for i := 0; i < rowsCount1; i++ {
|
|
r.Timestamp = int64(i * 2894)
|
|
r.Value = float64(int(rng.NormFloat64() * 1e2))
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr1 := newTestBlockStreamReader(t, rows)
|
|
|
|
rows = rows[:0]
|
|
const rowsCount2 = maxRowsPerBlock + 2344
|
|
for i := 0; i < rowsCount2; i++ {
|
|
r.Timestamp = int64(i * 2494)
|
|
r.Value = float64(int(rng.NormFloat64() * 1e2))
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr2 := newTestBlockStreamReader(t, rows)
|
|
|
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
|
testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsTwoStreamsBigSequentialBlocks(t *testing.T) {
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = 5
|
|
const rowsCount1 = maxRowsPerBlock + 234
|
|
for i := 0; i < rowsCount1; i++ {
|
|
r.Timestamp = int64(i * 2894)
|
|
r.Value = float64(int(rng.NormFloat64() * 1e2))
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
maxTimestampB1 := rows[len(rows)-1].Timestamp
|
|
bsr1 := newTestBlockStreamReader(t, rows)
|
|
|
|
rows = rows[:0]
|
|
const rowsCount2 = maxRowsPerBlock - 233
|
|
for i := 0; i < rowsCount2; i++ {
|
|
r.Timestamp = maxTimestampB1 + int64(i*2494)
|
|
r.Value = float64(int(rng.NormFloat64() * 1e2))
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr2 := newTestBlockStreamReader(t, rows)
|
|
|
|
bsrs := []*blockStreamReader{bsr1, bsr2}
|
|
testMergeBlockStreams(t, bsrs, 3, rowsCount1+rowsCount2, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeBlockStreamsManyStreamsManyBlocksManyRows(t *testing.T) {
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
|
|
var r rawRow
|
|
initTestTSID(&r.TSID)
|
|
r.PrecisionBits = defaultPrecisionBits
|
|
|
|
rng := rand.New(rand.NewSource(1))
|
|
rowsCount := 0
|
|
const blocksCount = 113
|
|
var bsrs []*blockStreamReader
|
|
for i := 0; i < 20; i++ {
|
|
rowsPerStream := rng.Intn(500)
|
|
var rows []rawRow
|
|
for j := 0; j < rowsPerStream; j++ {
|
|
r.TSID.MetricID = uint64(j % blocksCount)
|
|
r.Timestamp = int64(rng.Intn(1e9))
|
|
r.Value = rng.NormFloat64()
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr := newTestBlockStreamReader(t, rows)
|
|
bsrs = append(bsrs, bsr)
|
|
rowsCount += rowsPerStream
|
|
}
|
|
testMergeBlockStreams(t, bsrs, blocksCount, rowsCount, minTimestamp, maxTimestamp)
|
|
}
|
|
|
|
func TestMergeForciblyStop(t *testing.T) {
|
|
minTimestamp := int64(1<<63 - 1)
|
|
maxTimestamp := int64(-1 << 63)
|
|
|
|
var r rawRow
|
|
initTestTSID(&r.TSID)
|
|
r.PrecisionBits = defaultPrecisionBits
|
|
|
|
rng := rand.New(rand.NewSource(1))
|
|
const blocksCount = 113
|
|
var bsrs []*blockStreamReader
|
|
for i := 0; i < 20; i++ {
|
|
rowsPerStream := rng.Intn(1000)
|
|
var rows []rawRow
|
|
for j := 0; j < rowsPerStream; j++ {
|
|
r.TSID.MetricID = uint64(j % blocksCount)
|
|
r.Timestamp = int64(rng.Intn(1e9))
|
|
r.Value = rng.NormFloat64()
|
|
rows = append(rows, r)
|
|
|
|
if r.Timestamp < minTimestamp {
|
|
minTimestamp = r.Timestamp
|
|
}
|
|
if r.Timestamp > maxTimestamp {
|
|
maxTimestamp = r.Timestamp
|
|
}
|
|
}
|
|
bsr := newTestBlockStreamReader(t, rows)
|
|
bsrs = append(bsrs, bsr)
|
|
}
|
|
|
|
var mp inmemoryPart
|
|
var bsw blockStreamWriter
|
|
bsw.MustInitFromInmemoryPart(&mp, -5)
|
|
ch := make(chan struct{})
|
|
var rowsMerged, rowsDeleted uint64
|
|
close(ch)
|
|
|
|
strg := newTestStorage()
|
|
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
|
|
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
|
|
}
|
|
if rowsMerged != 0 {
|
|
t.Fatalf("unexpected rowsMerged; got %d; want %d", rowsMerged, 0)
|
|
}
|
|
if rowsDeleted != 0 {
|
|
t.Fatalf("unexpected rowsDeleted; got %d; want %d", rowsDeleted, 0)
|
|
}
|
|
}
|
|
|
|
func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBlocksCount, expectedRowsCount int, expectedMinTimestamp, expectedMaxTimestamp int64) {
|
|
t.Helper()
|
|
|
|
var mp inmemoryPart
|
|
|
|
var bsw blockStreamWriter
|
|
bsw.MustInitFromInmemoryPart(&mp, -5)
|
|
|
|
strg := newTestStorage()
|
|
var rowsMerged, rowsDeleted uint64
|
|
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
|
|
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
|
|
}
|
|
|
|
// Verify written data.
|
|
if mp.ph.RowsCount != uint64(expectedRowsCount) {
|
|
t.Fatalf("unexpected rows count in partHeader; got %d; want %d", mp.ph.RowsCount, expectedRowsCount)
|
|
}
|
|
if rowsMerged != mp.ph.RowsCount {
|
|
t.Fatalf("unexpected rowsMerged; got %d; want %d", rowsMerged, mp.ph.RowsCount)
|
|
}
|
|
if rowsDeleted != 0 {
|
|
t.Fatalf("unexpected rowsDeleted; got %d; want %d", rowsDeleted, 0)
|
|
}
|
|
if mp.ph.MinTimestamp != expectedMinTimestamp {
|
|
t.Fatalf("unexpected MinTimestamp in partHeader; got %d; want %d", mp.ph.MinTimestamp, expectedMinTimestamp)
|
|
}
|
|
if mp.ph.MaxTimestamp != expectedMaxTimestamp {
|
|
t.Fatalf("unexpected MaxTimestamp in partHeader; got %d; want %d", mp.ph.MaxTimestamp, expectedMaxTimestamp)
|
|
}
|
|
|
|
var bsr1 blockStreamReader
|
|
bsr1.MustInitFromInmemoryPart(&mp)
|
|
blocksCount := 0
|
|
rowsCount := 0
|
|
var prevTSID TSID
|
|
for bsr1.NextBlock() {
|
|
if bsr1.Block.bh.TSID.Less(&prevTSID) {
|
|
t.Fatalf("the next block cannot have higher TSID than the previous block; got\n%+v vs\n%+v", &bsr1.Block.bh.TSID, &prevTSID)
|
|
}
|
|
prevTSID = bsr1.Block.bh.TSID
|
|
|
|
expectedRowsPerBlock := int(bsr1.Block.bh.RowsCount)
|
|
if expectedRowsPerBlock == 0 {
|
|
t.Fatalf("got zero rows in a block")
|
|
}
|
|
if bsr1.Block.bh.MinTimestamp < expectedMinTimestamp {
|
|
t.Fatalf("too small MinTimestamp in the blockHeader; got %d; cannot be smaller than %d", bsr1.Block.bh.MinTimestamp, expectedMinTimestamp)
|
|
}
|
|
if bsr1.Block.bh.MaxTimestamp > expectedMaxTimestamp {
|
|
t.Fatalf("too big MaxTimestamp in the blockHeader; got %d; cannot be bigger than %d", bsr1.Block.bh.MaxTimestamp, expectedMaxTimestamp)
|
|
}
|
|
|
|
if err := bsr1.Block.UnmarshalData(); err != nil {
|
|
t.Fatalf("cannot unmarshal block from merged stream: %s", err)
|
|
}
|
|
|
|
prevTimestamp := bsr1.Block.bh.MinTimestamp
|
|
blockMaxTimestamp := bsr1.Block.bh.MaxTimestamp
|
|
rowsPerBlock := 0
|
|
for bsr1.Block.nextRow() {
|
|
rowsPerBlock++
|
|
timestamp := bsr1.Block.timestamps[bsr1.Block.nextIdx-1]
|
|
if timestamp < prevTimestamp {
|
|
t.Fatalf("the next timestamp cannot be smaller than the previous timestamp; got %d vs %d", timestamp, prevTimestamp)
|
|
}
|
|
prevTimestamp = timestamp
|
|
}
|
|
if prevTimestamp > blockMaxTimestamp {
|
|
t.Fatalf("the last timestamp cannot be bigger than the MaxTimestamp in the blockHeader; got %d vs %d", prevTimestamp, blockMaxTimestamp)
|
|
}
|
|
if rowsPerBlock != expectedRowsPerBlock {
|
|
t.Fatalf("unexpected rows read in the block; got %d; want %d", rowsPerBlock, expectedRowsPerBlock)
|
|
}
|
|
rowsCount += rowsPerBlock
|
|
blocksCount++
|
|
}
|
|
if err := bsr1.Error(); err != nil {
|
|
t.Fatalf("unexpected error when reading merged stream: %s", err)
|
|
}
|
|
if blocksCount != expectedBlocksCount {
|
|
t.Fatalf("unexpected blocks read from merged stream; got %d; want %d", blocksCount, expectedBlocksCount)
|
|
}
|
|
if rowsCount != expectedRowsCount {
|
|
t.Fatalf("unexpected rows read from merged stream; got %d; want %d", rowsCount, expectedRowsCount)
|
|
}
|
|
}
|