From 5fa952549893d90ef9d1074d4d61ab4134e7a010 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 6 Sep 2022 13:04:29 +0300 Subject: [PATCH] lib/storage: verify that timestamps in block are in the range specified by blockHeader.{Min,Max}Timestamp when upacking the block This should reduce chances of unnoticed on-disk data corruption. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011 This change modifies the format for data exported via /api/v1/export/native - now this data contains MaxTimestamp and PrecisionBits fields from blockHeader. This is OK, since the native export format is undocumented. --- docs/CHANGELOG.md | 6 ++- lib/encoding/encoding.go | 2 +- lib/storage/block.go | 81 +++++++++++++------------------------ lib/storage/block_header.go | 64 +++++++++++++++++++++++++++++ lib/storage/block_test.go | 45 +++++++++++++++++---- 5 files changed, 137 insertions(+), 61 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 606f74b19..186dc0627 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,7 +15,11 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip -* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). +**Update note:** this release changes data format for [/api/v1/export/native](https://docs.victoriametrics.com/#how-to-export-data-in-native-format) in incompatible way, so it cannot be imported into older version of VictoriaMetrics via [/api/v1/import/native](https://docs.victoriametrics.com/#how-to-import-data-in-native-format). + + +* FEATURE: check the correctess of raw sample timestamps stored on disk when reading them. This reduces the probability of possible silent corruption of the data stored on disk. This should help [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2998) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3011). +* FEATURE: set the `start` arg to `end - 5 minutes` if isn't passed explicitly to [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and [/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3052). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `vm-native-step-interval` command line flag for `vm-native` mode. New option allows splitting the import process into chunks by time interval. This helps migrating data sets with high churn rate and provides better control over the process. See [feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2733). diff --git a/lib/encoding/encoding.go b/lib/encoding/encoding.go index 01829c815..b53ee2284 100644 --- a/lib/encoding/encoding.go +++ b/lib/encoding/encoding.go @@ -240,7 +240,7 @@ var bbPool bytesutil.ByteBufferPool // EnsureNonDecreasingSequence makes sure the first item in a is vMin, the last // item in a is vMax and all the items in a are non-decreasing. // -// If this isn't the case the a is fixed accordingly. +// If this isn't the case then a is fixed accordingly. func EnsureNonDecreasingSequence(a []int64, vMin, vMax int64) { if vMax < vMin { logger.Panicf("BUG: vMax cannot be smaller than vMin; got %d vs %d", vMax, vMin) diff --git a/lib/storage/block.go b/lib/storage/block.go index edc7507e6..84c7ee626 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -2,7 +2,6 @@ package storage import ( "fmt" - "math" "sync" "sync/atomic" @@ -272,6 +271,11 @@ func (b *Block) UnmarshalData() error { if b.bh.PrecisionBits < 64 { // Recover timestamps order after lossy compression. encoding.EnsureNonDecreasingSequence(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp) + } else { + // Ensure timestamps are in the range [MinTimestamp ... MaxTimestamps] and are ordered. + if err := checkTimestampsBounds(b.timestamps, b.bh.MinTimestamp, b.bh.MaxTimestamp); err != nil { + return err + } } b.timestampsData = b.timestampsData[:0] @@ -290,6 +294,27 @@ func (b *Block) UnmarshalData() error { return nil } +func checkTimestampsBounds(timestamps []int64, minTimestamp, maxTimestamp int64) error { + if len(timestamps) == 0 { + return nil + } + tsPrev := timestamps[0] + if tsPrev < minTimestamp { + return fmt.Errorf("timestamp for the row 0 out of %d rows cannot be smaller than %d; got %d", len(timestamps), minTimestamp, tsPrev) + } + for i, ts := range timestamps[1:] { + if ts < tsPrev { + return fmt.Errorf("timestamp for the row %d cannot be smaller than the timestamp for the row %d (total %d rows); got %d vs %d", + i+1, i, len(timestamps), ts, tsPrev) + } + tsPrev = ts + } + if tsPrev > maxTimestamp { + return fmt.Errorf("timestamp for the row %d (the last one) cannot be bigger than %d; got %d", len(timestamps)-1, maxTimestamp, tsPrev) + } + return nil +} + // AppendRowsWithTimeRangeFilter filters samples from b according to tr and appends them to dst*. // // It is expected that UnmarshalData has been already called on b. @@ -326,16 +351,9 @@ func (b *Block) filterTimestamps(tr TimeRange) ([]int64, []int64) { // The marshaled value must be unmarshaled with UnmarshalPortable function. func (b *Block) MarshalPortable(dst []byte) []byte { b.MarshalData(0, 0) - - dst = encoding.MarshalVarInt64(dst, b.bh.MinTimestamp) - dst = encoding.MarshalVarInt64(dst, b.bh.FirstValue) - dst = encoding.MarshalVarUint64(dst, uint64(b.bh.RowsCount)) - dst = encoding.MarshalVarInt64(dst, int64(b.bh.Scale)) - dst = append(dst, byte(b.bh.TimestampsMarshalType)) - dst = append(dst, byte(b.bh.ValuesMarshalType)) + dst = b.bh.marshalPortable(dst) dst = encoding.MarshalBytes(dst, b.timestampsData) dst = encoding.MarshalBytes(dst, b.valuesData) - return dst } @@ -344,50 +362,10 @@ func (b *Block) MarshalPortable(dst []byte) []byte { // It is assumed that the block has been marshaled with MarshalPortable. func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) { b.Reset() - - // Read header - src, firstTimestamp, err := encoding.UnmarshalVarInt64(src) + src, err := b.bh.unmarshalPortable(src) if err != nil { - return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err) + return src, err } - b.bh.MinTimestamp = firstTimestamp - src, firstValue, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal firstValue: %w", err) - } - b.bh.FirstValue = firstValue - src, rowsCount, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err) - } - if rowsCount > math.MaxUint32 { - return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32)) - } - b.bh.RowsCount = uint32(rowsCount) - src, scale, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal scale: %w", err) - } - if scale < math.MinInt16 { - return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16) - } - if scale > math.MaxInt16 { - return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16) - } - b.bh.Scale = int16(scale) - if len(src) < 1 { - return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1) - } - b.bh.TimestampsMarshalType = encoding.MarshalType(src[0]) - src = src[1:] - if len(src) < 1 { - return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1) - } - b.bh.ValuesMarshalType = encoding.MarshalType(src[0]) - src = src[1:] - b.bh.PrecisionBits = 64 - - // Read data src, timestampsData, err := encoding.UnmarshalBytes(src) if err != nil { return src, fmt.Errorf("cannot read timestampsData: %w", err) @@ -399,7 +377,6 @@ func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) { } b.valuesData = append(b.valuesData[:0], valuesData...) - // Validate if err := b.bh.validate(); err != nil { return src, fmt.Errorf("invalid blockHeader: %w", err) } diff --git a/lib/storage/block_header.go b/lib/storage/block_header.go index 38b49c44f..6bade6216 100644 --- a/lib/storage/block_header.go +++ b/lib/storage/block_header.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "math" "sort" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -154,6 +155,69 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) { return src, err } +func (bh *blockHeader) marshalPortable(dst []byte) []byte { + dst = encoding.MarshalVarInt64(dst, bh.MinTimestamp) + dst = encoding.MarshalVarInt64(dst, bh.MaxTimestamp) + dst = encoding.MarshalVarInt64(dst, bh.FirstValue) + dst = encoding.MarshalVarUint64(dst, uint64(bh.RowsCount)) + dst = encoding.MarshalVarInt64(dst, int64(bh.Scale)) + dst = append(dst, byte(bh.TimestampsMarshalType), byte(bh.ValuesMarshalType), byte(bh.PrecisionBits)) + return dst +} + +func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) { + src, minTimestamp, err := encoding.UnmarshalVarInt64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err) + } + bh.MinTimestamp = minTimestamp + src, maxTimestamp, err := encoding.UnmarshalVarInt64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err) + } + bh.MaxTimestamp = maxTimestamp + src, firstValue, err := encoding.UnmarshalVarInt64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal firstValue: %w", err) + } + bh.FirstValue = firstValue + src, rowsCount, err := encoding.UnmarshalVarUint64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err) + } + if rowsCount > math.MaxUint32 { + return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32)) + } + bh.RowsCount = uint32(rowsCount) + src, scale, err := encoding.UnmarshalVarInt64(src) + if err != nil { + return src, fmt.Errorf("cannot unmarshal scale: %w", err) + } + if scale < math.MinInt16 { + return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16) + } + if scale > math.MaxInt16 { + return src, fmt.Errorf("got too big scale=%d; it mustn't exceeed %d", scale, math.MaxInt16) + } + bh.Scale = int16(scale) + if len(src) < 1 { + return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1) + } + bh.TimestampsMarshalType = encoding.MarshalType(src[0]) + src = src[1:] + if len(src) < 1 { + return src, fmt.Errorf("cannot unmarshal marshalType for values from %d bytes; need at least %d bytes", len(src), 1) + } + bh.ValuesMarshalType = encoding.MarshalType(src[0]) + src = src[1:] + if len(src) < 1 { + return src, fmt.Errorf("cannot unmarshal precisionBits for values from %d bytes; need at least %d bytes", len(src), 1) + } + bh.PrecisionBits = uint8(src[0]) + src = src[1:] + return src, nil +} + func (bh *blockHeader) validate() error { if bh.RowsCount == 0 { return fmt.Errorf("RowsCount in block header cannot be zero") diff --git a/lib/storage/block_test.go b/lib/storage/block_test.go index e057b465a..86701860c 100644 --- a/lib/storage/block_test.go +++ b/lib/storage/block_test.go @@ -1,10 +1,13 @@ package storage import ( + "fmt" "math/rand" "reflect" "strings" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" ) func TestBlockMarshalUnmarshalPortable(t *testing.T) { @@ -15,15 +18,15 @@ func TestBlockMarshalUnmarshalPortable(t *testing.T) { b.timestamps = getRandTimestamps(rowsCount) b.values = getRandValues(rowsCount) b.bh.Scale = int16(rand.Intn(30) - 15) - b.bh.PrecisionBits = 64 + b.bh.PrecisionBits = uint8(64 - (i % 64)) testBlockMarshalUnmarshalPortable(t, &b) } } func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) { var b1, b2 Block - b1.CopyFrom(b) rowsCount := len(b.values) + b1.CopyFrom(b) data := b1.MarshalPortable(nil) if b1.bh.RowsCount != uint32(rowsCount) { t.Fatalf("unexpected number of rows marshaled; got %d; want %d", b1.bh.RowsCount, rowsCount) @@ -60,11 +63,14 @@ func testBlockMarshalUnmarshalPortable(t *testing.T, b *Block) { compareBlocksPortable(t, &b2, b, &b1.bh) } -func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) { +func compareBlocksPortable(t *testing.T, b1, bExpected *Block, bhExpected *blockHeader) { t.Helper() if b1.bh.MinTimestamp != bhExpected.MinTimestamp { t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MinTimestamp, bhExpected.MinTimestamp) } + if b1.bh.MaxTimestamp != bhExpected.MaxTimestamp { + t.Fatalf("unexpected MinTimestamp; got %d; want %d", b1.bh.MaxTimestamp, bhExpected.MaxTimestamp) + } if b1.bh.FirstValue != bhExpected.FirstValue { t.Fatalf("unexpected FirstValue; got %d; want %d", b1.bh.FirstValue, bhExpected.FirstValue) } @@ -83,11 +89,15 @@ func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) if b1.bh.PrecisionBits != bhExpected.PrecisionBits { t.Fatalf("unexpected PrecisionBits; got %d; want %d", b1.bh.PrecisionBits, bhExpected.PrecisionBits) } - if !reflect.DeepEqual(b1.values, b2.values) { - t.Fatalf("unexpected values; got %d; want %d", b1.values, b2.values) + + timestampsExpected := getTimestampsForPrecisionBits(bExpected.timestamps, bhExpected.PrecisionBits) + valuesExpected := getValuesForPrecisionBits(bExpected.values, bhExpected.PrecisionBits) + + if !reflect.DeepEqual(b1.values, valuesExpected) { + t.Fatalf("unexpected values for precisionBits=%d; got\n%d\nwant\n%d", b1.bh.PrecisionBits, b1.values, valuesExpected) } - if !reflect.DeepEqual(b1.timestamps, b2.timestamps) { - t.Fatalf("unexpected timestamps; got %d; want %d", b1.timestamps, b2.timestamps) + if !reflect.DeepEqual(b1.timestamps, timestampsExpected) { + t.Fatalf("unexpected timestamps for precisionBits=%d; got\n%d\nwant\n%d", b1.bh.PrecisionBits, b1.timestamps, timestampsExpected) } if len(b1.values) != int(bhExpected.RowsCount) { t.Fatalf("unexpected number of values; got %d; want %d", len(b1.values), bhExpected.RowsCount) @@ -97,6 +107,27 @@ func compareBlocksPortable(t *testing.T, b1, b2 *Block, bhExpected *blockHeader) } } +func getTimestampsForPrecisionBits(timestamps []int64, precisionBits uint8) []int64 { + data, marshalType, firstTimestamp := encoding.MarshalTimestamps(nil, timestamps, precisionBits) + timestampsAdjusted, err := encoding.UnmarshalTimestamps(nil, data, marshalType, firstTimestamp, len(timestamps)) + if err != nil { + panic(fmt.Errorf("BUG: cannot unmarshal timestamps with precisionBits %d: %s", precisionBits, err)) + } + minTimestamp := timestamps[0] + maxTimestamp := timestamps[len(timestamps)-1] + encoding.EnsureNonDecreasingSequence(timestampsAdjusted, minTimestamp, maxTimestamp) + return timestampsAdjusted +} + +func getValuesForPrecisionBits(values []int64, precisionBits uint8) []int64 { + data, marshalType, firstValue := encoding.MarshalValues(nil, values, precisionBits) + valuesAdjusted, err := encoding.UnmarshalValues(nil, data, marshalType, firstValue, len(values)) + if err != nil { + panic(fmt.Errorf("BUG: cannot unmarshal values with precisionBits %d: %s", precisionBits, err)) + } + return valuesAdjusted +} + func getRandValues(rowsCount int) []int64 { a := make([]int64, rowsCount) for i := 0; i < rowsCount; i++ {