mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: verify that timestamps in block are in the range specified by blockHeader.{Min,Max}Timestamp when upacking the block
This should reduce chances of unnoticed on-disk data corruption. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011 This change modifies the format for data exported via /api/v1/export/native - now this data contains MaxTimestamp and PrecisionBits fields from blockHeader. This is OK, since the native export format is undocumented.
This commit is contained in:
parent
c5261d5f56
commit
5fa9525498
5 changed files with 137 additions and 61 deletions
|
@ -15,7 +15,11 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues).
|
||||
**Update note:** this release changes data format for [/api/v1/export/native](https://docs.victoriametrics.com/#how-to-export-data-in-native-format) in incompatible way, so it cannot be imported into older version of VictoriaMetrics via [/api/v1/import/native](https://docs.victoriametrics.com/#how-to-import-data-in-native-format).
|
||||
|
||||
|
||||
* FEATURE: check the correctess of raw sample timestamps stored on disk when reading them. This reduces the probability of possible silent corruption of the data stored on disk. This should help [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011).
|
||||
* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3052).
|
||||
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `vm-native-step-interval` command line flag for `vm-native` mode. New option allows splitting the import process into chunks by time interval. This helps migrating data sets with high churn rate and provides better control over the process. See [feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2733).
|
||||
|
||||
|
|
|
@ -240,7 +240,7 @@ var bbPool bytesutil.ByteBufferPool
|
|||
// EnsureNonDecreasingSequence makes sure the first item in a is vMin, the last
|
||||
// item in a is vMax and all the items in a are non-decreasing.
|
||||
//
|
||||
// If this isn't the case the a is fixed accordingly.
|
||||
// If this isn't the case then a is fixed accordingly.
|
||||
func EnsureNonDecreasingSequence(a []int64, vMin, vMax int64) {
|
||||
if vMax < vMin {
|
||||
logger.Panicf("BUG: vMax cannot be smaller than vMin; got %d vs %d", vMax, vMin)
|
||||
|
|
|
@ -2,7 +2,6 @@ package storage
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -272,6 +271,11 @@ func (b *Block) UnmarshalData() error {
|
|||
if b.bh.PrecisionBits < 64 {
|
||||
// Recover timestamps order after lossy compression.
|
||||
encoding.EnsureNonDecreasingSequence(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp)
|
||||
} else {
|
||||
// Ensure timestamps are in the range [MinTimestamp ... MaxTimestamps] and are ordered.
|
||||
if err := checkTimestampsBounds(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b.timestampsData = b.timestampsData[:0]
|
||||
|
||||
|
@ -290,6 +294,27 @@ func (b *Block) UnmarshalData() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func checkTimestampsBounds(timestamps []int64, minTimestamp, maxTimestamp int64) error {
|
||||
if len(timestamps) == 0 {
|
||||
return nil
|
||||
}
|
||||
tsPrev := timestamps[0]
|
||||
if tsPrev < minTimestamp {
|
||||
return fmt.Errorf("timestamp for the row 0 out of %d rows cannot be smaller than %d; got %d", len(timestamps), minTimestamp, tsPrev)
|
||||
}
|
||||
for i, ts := range timestamps[1:] {
|
||||
if ts < tsPrev {
|
||||
return fmt.Errorf("timestamp for the row %d cannot be smaller than the timestamp for the row %d (total %d rows); got %d vs %d",
|
||||
i+1, i, len(timestamps), ts, tsPrev)
|
||||
}
|
||||
tsPrev = ts
|
||||
}
|
||||
if tsPrev > maxTimestamp {
|
||||
return fmt.Errorf("timestamp for the row %d (the last one) cannot be bigger than %d; got %d", len(timestamps)-1, maxTimestamp, tsPrev)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AppendRowsWithTimeRangeFilter filters samples from b according to tr and appends them to dst*.
|
||||
//
|
||||
// It is expected that UnmarshalData has been already called on b.
|
||||
|
@ -326,16 +351,9 @@ func (b *Block) filterTimestamps(tr TimeRange) ([]int64, []int64) {
|
|||
// The marshaled value must be unmarshaled with UnmarshalPortable function.
|
||||
func (b *Block) MarshalPortable(dst []byte) []byte {
|
||||
b.MarshalData(0, 0)
|
||||
|
||||
dst = encoding.MarshalVarInt64(dst, b.bh.MinTimestamp)
|
||||
dst = encoding.MarshalVarInt64(dst, b.bh.FirstValue)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(b.bh.RowsCount))
|
||||
dst = encoding.MarshalVarInt64(dst, int64(b.bh.Scale))
|
||||
dst = append(dst, byte(b.bh.TimestampsMarshalType))
|
||||
dst = append(dst, byte(b.bh.ValuesMarshalType))
|
||||
dst = b.bh.marshalPortable(dst)
|
||||
dst = encoding.MarshalBytes(dst, b.timestampsData)
|
||||
dst = encoding.MarshalBytes(dst, b.valuesData)
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
|
@ -344,50 +362,10 @@ func (b *Block) MarshalPortable(dst []byte) []byte {
|
|||
// It is assumed that the block has been marshaled with MarshalPortable.
|
||||
func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
|
||||
b.Reset()
|
||||
|
||||
// Read header
|
||||
src, firstTimestamp, err := encoding.UnmarshalVarInt64(src)
|
||||
src, err := b.bh.unmarshalPortable(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
|
||||
return src, err
|
||||
}
|
||||
b.bh.MinTimestamp = firstTimestamp
|
||||
src, firstValue, err := encoding.UnmarshalVarInt64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal firstValue: %w", err)
|
||||
}
|
||||
b.bh.FirstValue = firstValue
|
||||
src, rowsCount, err := encoding.UnmarshalVarUint64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
|
||||
}
|
||||
if rowsCount > math.MaxUint32 {
|
||||
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32))
|
||||
}
|
||||
b.bh.RowsCount = uint32(rowsCount)
|
||||
src, scale, err := encoding.UnmarshalVarInt64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal scale: %w", err)
|
||||
}
|
||||
if scale < math.MinInt16 {
|
||||
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
|
||||
}
|
||||
if scale > math.MaxInt16 {
|
||||
return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16)
|
||||
}
|
||||
b.bh.Scale = int16(scale)
|
||||
if len(src) < 1 {
|
||||
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
|
||||
}
|
||||
b.bh.TimestampsMarshalType = encoding.MarshalType(src[0])
|
||||
src = src[1:]
|
||||
if len(src) < 1 {
|
||||
return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1)
|
||||
}
|
||||
b.bh.ValuesMarshalType = encoding.MarshalType(src[0])
|
||||
src = src[1:]
|
||||
b.bh.PrecisionBits = 64
|
||||
|
||||
// Read data
|
||||
src, timestampsData, err := encoding.UnmarshalBytes(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot read timestampsData: %w", err)
|
||||
|
@ -399,7 +377,6 @@ func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
|
|||
}
|
||||
b.valuesData = append(b.valuesData[:0], valuesData...)
|
||||
|
||||
// Validate
|
||||
if err := b.bh.validate(); err != nil {
|
||||
return src, fmt.Errorf("invalid blockHeader: %w", err)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package storage
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
|
@ -154,6 +155,69 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
|
|||
return src, err
|
||||
}
|
||||
|
||||
func (bh *blockHeader) marshalPortable(dst []byte) []byte {
|
||||
dst = encoding.MarshalVarInt64(dst, bh.MinTimestamp)
|
||||
dst = encoding.MarshalVarInt64(dst, bh.MaxTimestamp)
|
||||
dst = encoding.MarshalVarInt64(dst, bh.FirstValue)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(bh.RowsCount))
|
||||
dst = encoding.MarshalVarInt64(dst, int64(bh.Scale))
|
||||
dst = append(dst, byte(bh.TimestampsMarshalType), byte(bh.ValuesMarshalType), byte(bh.PrecisionBits))
|
||||
return dst
|
||||
}
|
||||
|
||||
func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) {
|
||||
src, minTimestamp, err := encoding.UnmarshalVarInt64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
|
||||
}
|
||||
bh.MinTimestamp = minTimestamp
|
||||
src, maxTimestamp, err := encoding.UnmarshalVarInt64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
|
||||
}
|
||||
bh.MaxTimestamp = maxTimestamp
|
||||
src, firstValue, err := encoding.UnmarshalVarInt64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal firstValue: %w", err)
|
||||
}
|
||||
bh.FirstValue = firstValue
|
||||
src, rowsCount, err := encoding.UnmarshalVarUint64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
|
||||
}
|
||||
if rowsCount > math.MaxUint32 {
|
||||
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32))
|
||||
}
|
||||
bh.RowsCount = uint32(rowsCount)
|
||||
src, scale, err := encoding.UnmarshalVarInt64(src)
|
||||
if err != nil {
|
||||
return src, fmt.Errorf("cannot unmarshal scale: %w", err)
|
||||
}
|
||||
if scale < math.MinInt16 {
|
||||
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
|
||||
}
|
||||
if scale > math.MaxInt16 {
|
||||
return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16)
|
||||
}
|
||||
bh.Scale = int16(scale)
|
||||
if len(src) < 1 {
|
||||
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
|
||||
}
|
||||
bh.TimestampsMarshalType = encoding.MarshalType(src[0])
|
||||
src = src[1:]
|
||||
if len(src) < 1 {
|
||||
return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1)
|
||||
}
|
||||
bh.ValuesMarshalType = encoding.MarshalType(src[0])
|
||||
src = src[1:]
|
||||
if len(src) < 1 {
|
||||
return src, fmt.Errorf("cannot unmarshal precisionBits for values from %d bytes; need at least %d bytes", len(src), 1)
|
||||
}
|
||||
bh.PrecisionBits = uint8(src[0])
|
||||
src = src[1:]
|
||||
return src, nil
|
||||
}
|
||||
|
||||
func (bh *blockHeader) validate() error {
|
||||
if bh.RowsCount == 0 {
|
||||
return fmt.Errorf("RowsCount in block header cannot be zero")
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
)
|
||||
|
||||
func TestBlockMarshalUnmarshalPortable(t *testing.T) {
|
||||
|
@ -15,15 +18,15 @@ func TestBlockMarshalUnmarshalPortable(t *testing.T) {
|
|||
b.timestamps = getRandTimestamps(rowsCount)
|
||||
b.values = getRandValues(rowsCount)
|
||||
b.bh.Scale = int16(rand.Intn(30) - 15)
|
||||
b.bh.PrecisionBits = 64
|
||||
b.bh.PrecisionBits = uint8(64 - (i % 64))
|
||||
testBlockMarshalUnmarshalPortable(t, &b)
|
||||
}
|
||||
}
|
||||
|
||||
func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) {
|
||||
var b1, b2 Block
|
||||
b1.CopyFrom(b)
|
||||
rowsCount := len(b.values)
|
||||
b1.CopyFrom(b)
|
||||
data := b1.MarshalPortable(nil)
|
||||
if b1.bh.RowsCount != uint32(rowsCount) {
|
||||
t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount)
|
||||
|
@ -60,11 +63,14 @@ func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) {
|
|||
compareBlocksPortable(t, &b2, b, &b1.bh)
|
||||
}
|
||||
|
||||
func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) {
|
||||
func compareBlocksPortable(t *testing.T, b1, bExpected *Block, bhExpected *blockHeader) {
|
||||
t.Helper()
|
||||
if b1.bh.MinTimestamp != bhExpected.MinTimestamp {
|
||||
t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MinTimestamp, bhExpected.MinTimestamp)
|
||||
}
|
||||
if b1.bh.MaxTimestamp != bhExpected.MaxTimestamp {
|
||||
t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MaxTimestamp, bhExpected.MaxTimestamp)
|
||||
}
|
||||
if b1.bh.FirstValue != bhExpected.FirstValue {
|
||||
t.Fatalf("unexpected FirstValue; got %d; want %d", b1.bh.FirstValue, bhExpected.FirstValue)
|
||||
}
|
||||
|
@ -83,11 +89,15 @@ func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader)
|
|||
if b1.bh.PrecisionBits != bhExpected.PrecisionBits {
|
||||
t.Fatalf("unexpected PrecisionBits; got %d; want %d", b1.bh.PrecisionBits, bhExpected.PrecisionBits)
|
||||
}
|
||||
if !reflect.DeepEqual(b1.values, b2.values) {
|
||||
t.Fatalf("unexpected values; got %d; want %d", b1.values, b2.values)
|
||||
|
||||
timestampsExpected := getTimestampsForPrecisionBits(bExpected.timestamps, bhExpected.PrecisionBits)
|
||||
valuesExpected := getValuesForPrecisionBits(bExpected.values, bhExpected.PrecisionBits)
|
||||
|
||||
if !reflect.DeepEqual(b1.values, valuesExpected) {
|
||||
t.Fatalf("unexpected values for precisionBits=%d; got\n%d\nwant\n%d", b1.bh.PrecisionBits, b1.values, valuesExpected)
|
||||
}
|
||||
if !reflect.DeepEqual(b1.timestamps, b2.timestamps) {
|
||||
t.Fatalf("unexpected timestamps; got %d; want %d", b1.timestamps, b2.timestamps)
|
||||
if !reflect.DeepEqual(b1.timestamps, timestampsExpected) {
|
||||
t.Fatalf("unexpected timestamps for precisionBits=%d; got\n%d\nwant\n%d", b1.bh.PrecisionBits, b1.timestamps, timestampsExpected)
|
||||
}
|
||||
if len(b1.values) != int(bhExpected.RowsCount) {
|
||||
t.Fatalf("unexpected number of values; got %d; want %d", len(b1.values), bhExpected.RowsCount)
|
||||
|
@ -97,6 +107,27 @@ func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader)
|
|||
}
|
||||
}
|
||||
|
||||
func getTimestampsForPrecisionBits(timestamps []int64, precisionBits uint8) []int64 {
|
||||
data, marshalType, firstTimestamp := encoding.MarshalTimestamps(nil, timestamps, precisionBits)
|
||||
timestampsAdjusted, err := encoding.UnmarshalTimestamps(nil, data, marshalType, firstTimestamp, len(timestamps))
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot unmarshal timestamps with precisionBits %d: %s", precisionBits, err))
|
||||
}
|
||||
minTimestamp := timestamps[0]
|
||||
maxTimestamp := timestamps[len(timestamps)-1]
|
||||
encoding.EnsureNonDecreasingSequence(timestampsAdjusted, minTimestamp, maxTimestamp)
|
||||
return timestampsAdjusted
|
||||
}
|
||||
|
||||
func getValuesForPrecisionBits(values []int64, precisionBits uint8) []int64 {
|
||||
data, marshalType, firstValue := encoding.MarshalValues(nil, values, precisionBits)
|
||||
valuesAdjusted, err := encoding.UnmarshalValues(nil, data, marshalType, firstValue, len(values))
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot unmarshal values with precisionBits %d: %s", precisionBits, err))
|
||||
}
|
||||
return valuesAdjusted
|
||||
}
|
||||
|
||||
func getRandValues(rowsCount int) []int64 {
|
||||
a := make([]int64, rowsCount)
|
||||
for i := 0; i < rowsCount; i++ {
|
||||
|
|
Loading…
Reference in a new issue