mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
160 lines
3.9 KiB
Go
160 lines
3.9 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestBlockStreamReaderSingleRow(t *testing.T) {
|
|
rows := []rawRow{{
|
|
Timestamp: 12334545,
|
|
Value: 1.2345,
|
|
PrecisionBits: defaultPrecisionBits,
|
|
}}
|
|
testBlocksStreamReader(t, rows, 1)
|
|
}
|
|
|
|
func TestBlockStreamReaderSingleBlockManyRows(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = defaultPrecisionBits
|
|
for i := 0; i < maxRowsPerBlock; i++ {
|
|
r.Value = rng.Float64()*1e9 - 5e8
|
|
r.Timestamp = int64(i * 1e9)
|
|
rows = append(rows, r)
|
|
}
|
|
testBlocksStreamReader(t, rows, 1)
|
|
}
|
|
|
|
func TestBlockStreamReaderSingleTSIDManyBlocks(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = 1
|
|
for i := 0; i < 5*maxRowsPerBlock; i++ {
|
|
r.Value = rng.NormFloat64() * 1e4
|
|
r.Timestamp = int64(rng.NormFloat64() * 1e9)
|
|
rows = append(rows, r)
|
|
}
|
|
testBlocksStreamReader(t, rows, 5)
|
|
}
|
|
|
|
func TestBlockStreamReaderManyTSIDSingleRow(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = defaultPrecisionBits
|
|
for i := 0; i < 1000; i++ {
|
|
r.TSID.MetricID = uint64(i)
|
|
r.Value = rng.Float64()*1e9 - 5e8
|
|
r.Timestamp = int64(i * 1e9)
|
|
rows = append(rows, r)
|
|
}
|
|
testBlocksStreamReader(t, rows, 1000)
|
|
}
|
|
|
|
func TestBlockStreamReaderManyTSIDManyRows(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = defaultPrecisionBits
|
|
const blocks = 123
|
|
for i := 0; i < 3210; i++ {
|
|
r.TSID.MetricID = uint64((1e9 - i) % blocks)
|
|
r.Value = rng.Float64()
|
|
r.Timestamp = int64(rng.Float64() * 1e9)
|
|
rows = append(rows, r)
|
|
}
|
|
testBlocksStreamReader(t, rows, blocks)
|
|
}
|
|
|
|
func TestBlockStreamReaderReadConcurrent(t *testing.T) {
|
|
rng := rand.New(rand.NewSource(1))
|
|
var rows []rawRow
|
|
var r rawRow
|
|
r.PrecisionBits = defaultPrecisionBits
|
|
const blocks = 123
|
|
for i := 0; i < 3210; i++ {
|
|
r.TSID.MetricID = uint64((1e9 - i) % blocks)
|
|
r.Value = rng.Float64()
|
|
r.Timestamp = int64(rng.Float64() * 1e9)
|
|
rows = append(rows, r)
|
|
}
|
|
var mp inmemoryPart
|
|
mp.InitFromRows(rows)
|
|
|
|
ch := make(chan error, 5)
|
|
for i := 0; i < 5; i++ {
|
|
go func() {
|
|
ch <- testBlockStreamReaderReadRows(&mp, rows)
|
|
}()
|
|
}
|
|
for i := 0; i < 5; i++ {
|
|
select {
|
|
case err := <-ch:
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
case <-time.After(time.Second * 5):
|
|
t.Fatalf("timeout")
|
|
}
|
|
}
|
|
}
|
|
|
|
func testBlockStreamReaderReadRows(mp *inmemoryPart, rows []rawRow) error {
|
|
var bsr blockStreamReader
|
|
bsr.MustInitFromInmemoryPart(mp)
|
|
rowsCount := 0
|
|
for bsr.NextBlock() {
|
|
if err := bsr.Block.UnmarshalData(); err != nil {
|
|
return fmt.Errorf("cannot unmarshal block data: %w", err)
|
|
}
|
|
for bsr.Block.nextRow() {
|
|
rowsCount++
|
|
}
|
|
}
|
|
if err := bsr.Error(); err != nil {
|
|
return fmt.Errorf("unexpected error in bsr.NextBlock: %w", err)
|
|
}
|
|
if rowsCount != len(rows) {
|
|
return fmt.Errorf("unexpected number of rows read; got %d; want %d", rowsCount, len(rows))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func testBlocksStreamReader(t *testing.T, rows []rawRow, expectedBlocksCount int) {
|
|
t.Helper()
|
|
|
|
bsr := newTestBlockStreamReader(rows)
|
|
blocksCount := 0
|
|
rowsCount := 0
|
|
for bsr.NextBlock() {
|
|
if err := bsr.Block.UnmarshalData(); err != nil {
|
|
t.Fatalf("cannot unmarshal block data: %s", err)
|
|
}
|
|
for bsr.Block.nextRow() {
|
|
rowsCount++
|
|
}
|
|
blocksCount++
|
|
}
|
|
if err := bsr.Error(); err != nil {
|
|
t.Fatalf("unexpected error in bsr.NextBlock: %s", err)
|
|
}
|
|
if blocksCount != expectedBlocksCount {
|
|
t.Fatalf("unexpected number of blocks read; got %d; want %d", blocksCount, expectedBlocksCount)
|
|
}
|
|
if rowsCount != len(rows) {
|
|
t.Fatalf("unexpected number of rows read; got %d; want %d", rowsCount, len(rows))
|
|
}
|
|
}
|
|
|
|
func newTestBlockStreamReader(rows []rawRow) *blockStreamReader {
|
|
var mp inmemoryPart
|
|
mp.InitFromRows(rows)
|
|
var bsr blockStreamReader
|
|
bsr.MustInitFromInmemoryPart(&mp)
|
|
return &bsr
|
|
}
|