lib/storage: verify that timestamps in block are in the range specified by blockHeader.{Min,Max}Timestamp when upacking the block

This should reduce chances of unnoticed on-disk data corruption.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011

This change modifies the format for data exported via /api/v1/export/native -
now this data contains MaxTimestamp and PrecisionBits fields from blockHeader.

This is OK, since the native export format is undocumented.
This commit is contained in:
Aliaksandr Valialkin 2022-09-06 13:04:29 +03:00
parent b7f3569522
commit 051e722112
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 137 additions and 61 deletions

View file

@ -15,7 +15,11 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip ## tip
* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). **Update note:** this release changes data format for [/api/v1/export/native](https://docs.victoriametrics.com/#how-to-export-data-in-native-format) in incompatible way, so it cannot be imported into older version of VictoriaMetrics via [/api/v1/import/native](https://docs.victoriametrics.com/#how-to-import-data-in-native-format).
* FEATURE: check the correctess of raw sample timestamps stored on disk when reading them. This reduces the probability of possible silent corruption of the data stored on disk. This should help [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011).
* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3052).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate `rate_over_sum(m[d])` as `sum_over_time(m[d])/d`. Previously the `sum_over_time(m[d])` could be improperly divided by smaller than `d` time range. See [rate_over_sum() docs](https://docs.victoriametrics.com/MetricsQL.html#rate_over_sum) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3045). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate `rate_over_sum(m[d])` as `sum_over_time(m[d])/d`. Previously the `sum_over_time(m[d])` could be improperly divided by smaller than `d` time range. See [rate_over_sum() docs](https://docs.victoriametrics.com/MetricsQL.html#rate_over_sum) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3045).

View file

@ -240,7 +240,7 @@ var bbPool bytesutil.ByteBufferPool
// EnsureNonDecreasingSequence makes sure the first item in a is vMin, the last // EnsureNonDecreasingSequence makes sure the first item in a is vMin, the last
// item in a is vMax and all the items in a are non-decreasing. // item in a is vMax and all the items in a are non-decreasing.
// //
// If this isn't the case the a is fixed accordingly. // If this isn't the case then a is fixed accordingly.
func EnsureNonDecreasingSequence(a []int64, vMin, vMax int64) { func EnsureNonDecreasingSequence(a []int64, vMin, vMax int64) {
if vMax < vMin { if vMax < vMin {
logger.Panicf("BUG: vMax cannot be smaller than vMin; got %d vs %d", vMax, vMin) logger.Panicf("BUG: vMax cannot be smaller than vMin; got %d vs %d", vMax, vMin)

View file

@ -2,7 +2,6 @@ package storage
import ( import (
"fmt" "fmt"
"math"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -272,6 +271,11 @@ func (b *Block) UnmarshalData() error {
if b.bh.PrecisionBits < 64 { if b.bh.PrecisionBits < 64 {
// Recover timestamps order after lossy compression. // Recover timestamps order after lossy compression.
encoding.EnsureNonDecreasingSequence(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp) encoding.EnsureNonDecreasingSequence(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp)
} else {
// Ensure timestamps are in the range [MinTimestamp ... MaxTimestamps] and are ordered.
if err := checkTimestampsBounds(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp); err != nil {
return err
}
} }
b.timestampsData = b.timestampsData[:0] b.timestampsData = b.timestampsData[:0]
@ -290,6 +294,27 @@ func (b *Block) UnmarshalData() error {
return nil return nil
} }
func checkTimestampsBounds(timestamps []int64, minTimestamp, maxTimestamp int64) error {
if len(timestamps) == 0 {
return nil
}
tsPrev := timestamps[0]
if tsPrev < minTimestamp {
return fmt.Errorf("timestamp for the row 0 out of %d rows cannot be smaller than %d; got %d", len(timestamps), minTimestamp, tsPrev)
}
for i, ts := range timestamps[1:] {
if ts < tsPrev {
return fmt.Errorf("timestamp for the row %d cannot be smaller than the timestamp for the row %d (total %d rows); got %d vs %d",
i+1, i, len(timestamps), ts, tsPrev)
}
tsPrev = ts
}
if tsPrev > maxTimestamp {
return fmt.Errorf("timestamp for the row %d (the last one) cannot be bigger than %d; got %d", len(timestamps)-1, maxTimestamp, tsPrev)
}
return nil
}
// AppendRowsWithTimeRangeFilter filters samples from b according to tr and appends them to dst*. // AppendRowsWithTimeRangeFilter filters samples from b according to tr and appends them to dst*.
// //
// It is expected that UnmarshalData has been already called on b. // It is expected that UnmarshalData has been already called on b.
@ -326,16 +351,9 @@ func (b *Block) filterTimestamps(tr TimeRange) ([]int64, []int64) {
// The marshaled value must be unmarshaled with UnmarshalPortable function. // The marshaled value must be unmarshaled with UnmarshalPortable function.
func (b *Block) MarshalPortable(dst []byte) []byte { func (b *Block) MarshalPortable(dst []byte) []byte {
b.MarshalData(0, 0) b.MarshalData(0, 0)
dst = b.bh.marshalPortable(dst)
dst = encoding.MarshalVarInt64(dst, b.bh.MinTimestamp)
dst = encoding.MarshalVarInt64(dst, b.bh.FirstValue)
dst = encoding.MarshalVarUint64(dst, uint64(b.bh.RowsCount))
dst = encoding.MarshalVarInt64(dst, int64(b.bh.Scale))
dst = append(dst, byte(b.bh.TimestampsMarshalType))
dst = append(dst, byte(b.bh.ValuesMarshalType))
dst = encoding.MarshalBytes(dst, b.timestampsData) dst = encoding.MarshalBytes(dst, b.timestampsData)
dst = encoding.MarshalBytes(dst, b.valuesData) dst = encoding.MarshalBytes(dst, b.valuesData)
return dst return dst
} }
@ -344,50 +362,10 @@ func (b *Block) MarshalPortable(dst []byte) []byte {
// It is assumed that the block has been marshaled with MarshalPortable. // It is assumed that the block has been marshaled with MarshalPortable.
func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) { func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
b.Reset() b.Reset()
src, err := b.bh.unmarshalPortable(src)
// Read header
src, firstTimestamp, err := encoding.UnmarshalVarInt64(src)
if err != nil { if err != nil {
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err) return src, err
} }
b.bh.MinTimestamp = firstTimestamp
src, firstValue, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstValue: %w", err)
}
b.bh.FirstValue = firstValue
src, rowsCount, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
}
if rowsCount > math.MaxUint32 {
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32))
}
b.bh.RowsCount = uint32(rowsCount)
src, scale, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal scale: %w", err)
}
if scale < math.MinInt16 {
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
}
if scale > math.MaxInt16 {
return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16)
}
b.bh.Scale = int16(scale)
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
}
b.bh.TimestampsMarshalType = encoding.MarshalType(src[0])
src = src[1:]
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1)
}
b.bh.ValuesMarshalType = encoding.MarshalType(src[0])
src = src[1:]
b.bh.PrecisionBits = 64
// Read data
src, timestampsData, err := encoding.UnmarshalBytes(src) src, timestampsData, err := encoding.UnmarshalBytes(src)
if err != nil { if err != nil {
return src, fmt.Errorf("cannot read timestampsData: %w", err) return src, fmt.Errorf("cannot read timestampsData: %w", err)
@ -399,7 +377,6 @@ func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
} }
b.valuesData = append(b.valuesData[:0], valuesData...) b.valuesData = append(b.valuesData[:0], valuesData...)
// Validate
if err := b.bh.validate(); err != nil { if err := b.bh.validate(); err != nil {
return src, fmt.Errorf("invalid blockHeader: %w", err) return src, fmt.Errorf("invalid blockHeader: %w", err)
} }

View file

@ -2,6 +2,7 @@ package storage
import ( import (
"fmt" "fmt"
"math"
"sort" "sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -154,6 +155,69 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
return src, err return src, err
} }
func (bh *blockHeader) marshalPortable(dst []byte) []byte {
dst = encoding.MarshalVarInt64(dst, bh.MinTimestamp)
dst = encoding.MarshalVarInt64(dst, bh.MaxTimestamp)
dst = encoding.MarshalVarInt64(dst, bh.FirstValue)
dst = encoding.MarshalVarUint64(dst, uint64(bh.RowsCount))
dst = encoding.MarshalVarInt64(dst, int64(bh.Scale))
dst = append(dst, byte(bh.TimestampsMarshalType), byte(bh.ValuesMarshalType), byte(bh.PrecisionBits))
return dst
}
func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) {
src, minTimestamp, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
}
bh.MinTimestamp = minTimestamp
src, maxTimestamp, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
}
bh.MaxTimestamp = maxTimestamp
src, firstValue, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal firstValue: %w", err)
}
bh.FirstValue = firstValue
src, rowsCount, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
}
if rowsCount > math.MaxUint32 {
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32))
}
bh.RowsCount = uint32(rowsCount)
src, scale, err := encoding.UnmarshalVarInt64(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal scale: %w", err)
}
if scale < math.MinInt16 {
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
}
if scale > math.MaxInt16 {
return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16)
}
bh.Scale = int16(scale)
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
}
bh.TimestampsMarshalType = encoding.MarshalType(src[0])
src = src[1:]
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1)
}
bh.ValuesMarshalType = encoding.MarshalType(src[0])
src = src[1:]
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal precisionBits for values from %d bytes; need at least %d bytes", len(src), 1)
}
bh.PrecisionBits = uint8(src[0])
src = src[1:]
return src, nil
}
func (bh *blockHeader) validate() error { func (bh *blockHeader) validate() error {
if bh.RowsCount == 0 { if bh.RowsCount == 0 {
return fmt.Errorf("RowsCount in block header cannot be zero") return fmt.Errorf("RowsCount in block header cannot be zero")

View file

@ -1,10 +1,13 @@
package storage package storage
import ( import (
"fmt"
"math/rand" "math/rand"
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
) )
func TestBlockMarshalUnmarshalPortable(t *testing.T) { func TestBlockMarshalUnmarshalPortable(t *testing.T) {
@ -15,15 +18,15 @@ func TestBlockMarshalUnmarshalPortable(t *testing.T) {
b.timestamps = getRandTimestamps(rowsCount) b.timestamps = getRandTimestamps(rowsCount)
b.values = getRandValues(rowsCount) b.values = getRandValues(rowsCount)
b.bh.Scale = int16(rand.Intn(30) - 15) b.bh.Scale = int16(rand.Intn(30) - 15)
b.bh.PrecisionBits = 64 b.bh.PrecisionBits = uint8(64 - (i % 64))
testBlockMarshalUnmarshalPortable(t, &b) testBlockMarshalUnmarshalPortable(t, &b)
} }
} }
func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) { func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) {
var b1, b2 Block var b1, b2 Block
b1.CopyFrom(b)
rowsCount := len(b.values) rowsCount := len(b.values)
b1.CopyFrom(b)
data := b1.MarshalPortable(nil) data := b1.MarshalPortable(nil)
if b1.bh.RowsCount != uint32(rowsCount) { if b1.bh.RowsCount != uint32(rowsCount) {
t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount) t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount)
@ -60,11 +63,14 @@ func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) {
compareBlocksPortable(t, &b2, b, &b1.bh) compareBlocksPortable(t, &b2, b, &b1.bh)
} }
func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) { func compareBlocksPortable(t *testing.T, b1, bExpected *Block, bhExpected *blockHeader) {
t.Helper() t.Helper()
if b1.bh.MinTimestamp != bhExpected.MinTimestamp { if b1.bh.MinTimestamp != bhExpected.MinTimestamp {
t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MinTimestamp, bhExpected.MinTimestamp) t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MinTimestamp, bhExpected.MinTimestamp)
} }
if b1.bh.MaxTimestamp != bhExpected.MaxTimestamp {
t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MaxTimestamp, bhExpected.MaxTimestamp)
}
if b1.bh.FirstValue != bhExpected.FirstValue { if b1.bh.FirstValue != bhExpected.FirstValue {
t.Fatalf("unexpected FirstValue; got %d; want %d", b1.bh.FirstValue, bhExpected.FirstValue) t.Fatalf("unexpected FirstValue; got %d; want %d", b1.bh.FirstValue, bhExpected.FirstValue)
} }
@ -83,11 +89,15 @@ func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader)
if b1.bh.PrecisionBits != bhExpected.PrecisionBits { if b1.bh.PrecisionBits != bhExpected.PrecisionBits {
t.Fatalf("unexpected PrecisionBits; got %d; want %d", b1.bh.PrecisionBits, bhExpected.PrecisionBits) t.Fatalf("unexpected PrecisionBits; got %d; want %d", b1.bh.PrecisionBits, bhExpected.PrecisionBits)
} }
if !reflect.DeepEqual(b1.values, b2.values) {
t.Fatalf("unexpected values; got %d; want %d", b1.values, b2.values) timestampsExpected := getTimestampsForPrecisionBits(bExpected.timestamps, bhExpected.PrecisionBits)
valuesExpected := getValuesForPrecisionBits(bExpected.values, bhExpected.PrecisionBits)
if !reflect.DeepEqual(b1.values, valuesExpected) {
t.Fatalf("unexpected values for precisionBits=%d; got\n%d\nwant\n%d", b1.bh.PrecisionBits, b1.values, valuesExpected)
} }
if !reflect.DeepEqual(b1.timestamps, b2.timestamps) { if !reflect.DeepEqual(b1.timestamps, timestampsExpected) {
t.Fatalf("unexpected timestamps; got %d; want %d", b1.timestamps, b2.timestamps) t.Fatalf("unexpected timestamps for precisionBits=%d; got\n%d\nwant\n%d", b1.bh.PrecisionBits, b1.timestamps, timestampsExpected)
} }
if len(b1.values) != int(bhExpected.RowsCount) { if len(b1.values) != int(bhExpected.RowsCount) {
t.Fatalf("unexpected number of values; got %d; want %d", len(b1.values), bhExpected.RowsCount) t.Fatalf("unexpected number of values; got %d; want %d", len(b1.values), bhExpected.RowsCount)
@ -97,6 +107,27 @@ func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader)
} }
} }
func getTimestampsForPrecisionBits(timestamps []int64, precisionBits uint8) []int64 {
data, marshalType, firstTimestamp := encoding.MarshalTimestamps(nil, timestamps, precisionBits)
timestampsAdjusted, err := encoding.UnmarshalTimestamps(nil, data, marshalType, firstTimestamp, len(timestamps))
if err != nil {
panic(fmt.Errorf("BUG: cannot unmarshal timestamps with precisionBits %d: %s", precisionBits, err))
}
minTimestamp := timestamps[0]
maxTimestamp := timestamps[len(timestamps)-1]
encoding.EnsureNonDecreasingSequence(timestampsAdjusted, minTimestamp, maxTimestamp)
return timestampsAdjusted
}
func getValuesForPrecisionBits(values []int64, precisionBits uint8) []int64 {
data, marshalType, firstValue := encoding.MarshalValues(nil, values, precisionBits)
valuesAdjusted, err := encoding.UnmarshalValues(nil, data, marshalType, firstValue, len(values))
if err != nil {
panic(fmt.Errorf("BUG: cannot unmarshal values with precisionBits %d: %s", precisionBits, err))
}
return valuesAdjusted
}
func getRandValues(rowsCount int) []int64 { func getRandValues(rowsCount int) []int64 {
a := make([]int64, rowsCount) a := make([]int64, rowsCount)
for i := 0; i < rowsCount; i++ { for i := 0; i < rowsCount; i++ {