mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
389 lines
12 KiB
Go
389 lines
12 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// blockStreamReader represents block stream reader.
|
|
type blockStreamReader struct {
|
|
// Currently active block.
|
|
Block Block
|
|
|
|
// Contains TSID for the previous block.
|
|
// This field is needed for checking that TSIDs
|
|
// increase over time when reading blocks.
|
|
tsidPrev TSID
|
|
|
|
// Filesystem path to the stream reader.
|
|
//
|
|
// Is empty for inmemory stream readers.
|
|
path string
|
|
|
|
ph partHeader
|
|
|
|
// Use io.Reader type for timestampsReader and valuesReader
|
|
// in order to remove I2I conversion in readBlock
|
|
// when passing them to fs.ReadFullData
|
|
timestampsReader io.Reader
|
|
valuesReader io.Reader
|
|
|
|
indexReader filestream.ReadCloser
|
|
|
|
mrs []metaindexRow
|
|
|
|
// Points the current mr from mrs.
|
|
mr *metaindexRow
|
|
|
|
// The total number of rows read so far.
|
|
rowsCount uint64
|
|
|
|
// The total number of blocks read so far.
|
|
blocksCount uint64
|
|
|
|
// The number of block headers in the current index block.
|
|
indexBlockHeadersCount uint32
|
|
|
|
timestampsBlockOffset uint64
|
|
valuesBlockOffset uint64
|
|
indexBlockOffset uint64
|
|
|
|
prevTimestampsBlockOffset uint64
|
|
prevTimestampsData []byte
|
|
|
|
indexData []byte
|
|
compressedIndexData []byte
|
|
|
|
// Cursor to indexData.
|
|
indexCursor []byte
|
|
|
|
err error
|
|
}
|
|
|
|
func (bsr *blockStreamReader) assertWriteClosers() {
|
|
_ = bsr.timestampsReader.(filestream.ReadCloser)
|
|
_ = bsr.valuesReader.(filestream.ReadCloser)
|
|
}
|
|
|
|
func (bsr *blockStreamReader) reset() {
|
|
bsr.Block.Reset()
|
|
|
|
bsr.path = ""
|
|
|
|
bsr.ph.Reset()
|
|
|
|
bsr.timestampsReader = nil
|
|
bsr.valuesReader = nil
|
|
bsr.indexReader = nil
|
|
|
|
bsr.mrs = bsr.mrs[:0]
|
|
bsr.mr = nil
|
|
|
|
bsr.rowsCount = 0
|
|
bsr.blocksCount = 0
|
|
bsr.indexBlockHeadersCount = 0
|
|
|
|
bsr.timestampsBlockOffset = 0
|
|
bsr.valuesBlockOffset = 0
|
|
bsr.indexBlockOffset = 0
|
|
|
|
bsr.prevTimestampsBlockOffset = 0
|
|
bsr.prevTimestampsData = bsr.prevTimestampsData[:0]
|
|
|
|
bsr.indexData = bsr.indexData[:0]
|
|
bsr.compressedIndexData = bsr.compressedIndexData[:0]
|
|
|
|
bsr.indexCursor = nil
|
|
|
|
bsr.err = nil
|
|
}
|
|
|
|
// String returns human-readable representation of bsr.
|
|
func (bsr *blockStreamReader) String() string {
|
|
if len(bsr.path) > 0 {
|
|
return bsr.path
|
|
}
|
|
return bsr.ph.String()
|
|
}
|
|
|
|
// InitFromInmemoryPart initializes bsr from the given mp.
|
|
func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
|
|
bsr.reset()
|
|
|
|
bsr.ph = mp.ph
|
|
bsr.timestampsReader = mp.timestampsData.NewReader()
|
|
bsr.valuesReader = mp.valuesData.NewReader()
|
|
bsr.indexReader = mp.indexData.NewReader()
|
|
|
|
var err error
|
|
bsr.mrs, err = unmarshalMetaindexRows(bsr.mrs[:0], mp.metaindexData.NewReader())
|
|
if err != nil {
|
|
logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err)
|
|
}
|
|
|
|
bsr.assertWriteClosers()
|
|
}
|
|
|
|
// InitFromFilePart initializes bsr from a file-based part on the given path.
|
|
//
|
|
// Files in the part are always read without OS cache pollution,
|
|
// since they are usually deleted after the merge.
|
|
func (bsr *blockStreamReader) InitFromFilePart(path string) error {
|
|
bsr.reset()
|
|
|
|
path = filepath.Clean(path)
|
|
|
|
if err := bsr.ph.ParseFromPath(path); err != nil {
|
|
return fmt.Errorf("cannot parse path to part: %w", err)
|
|
}
|
|
|
|
timestampsPath := path + "/timestamps.bin"
|
|
timestampsFile, err := filestream.Open(timestampsPath, true)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot open timestamps file in stream mode: %w", err)
|
|
}
|
|
|
|
valuesPath := path + "/values.bin"
|
|
valuesFile, err := filestream.Open(valuesPath, true)
|
|
if err != nil {
|
|
timestampsFile.MustClose()
|
|
return fmt.Errorf("cannot open values file in stream mode: %w", err)
|
|
}
|
|
|
|
indexPath := path + "/index.bin"
|
|
indexFile, err := filestream.Open(indexPath, true)
|
|
if err != nil {
|
|
timestampsFile.MustClose()
|
|
valuesFile.MustClose()
|
|
return fmt.Errorf("cannot open index file in stream mode: %w", err)
|
|
}
|
|
|
|
metaindexPath := path + "/metaindex.bin"
|
|
metaindexFile, err := filestream.Open(metaindexPath, true)
|
|
if err != nil {
|
|
timestampsFile.MustClose()
|
|
valuesFile.MustClose()
|
|
indexFile.MustClose()
|
|
return fmt.Errorf("cannot open metaindex file in stream mode: %w", err)
|
|
}
|
|
mrs, err := unmarshalMetaindexRows(bsr.mrs[:0], metaindexFile)
|
|
metaindexFile.MustClose()
|
|
if err != nil {
|
|
timestampsFile.MustClose()
|
|
valuesFile.MustClose()
|
|
indexFile.MustClose()
|
|
return fmt.Errorf("cannot unmarshal metaindex rows from file part %q: %w", metaindexPath, err)
|
|
}
|
|
|
|
bsr.path = path
|
|
bsr.timestampsReader = timestampsFile
|
|
bsr.valuesReader = valuesFile
|
|
bsr.indexReader = indexFile
|
|
bsr.mrs = mrs
|
|
|
|
bsr.assertWriteClosers()
|
|
|
|
return nil
|
|
}
|
|
|
|
// MustClose closes the bsr.
|
|
//
|
|
// It closes *Reader files passed to Init.
|
|
func (bsr *blockStreamReader) MustClose() {
|
|
bsr.timestampsReader.(filestream.ReadCloser).MustClose()
|
|
bsr.valuesReader.(filestream.ReadCloser).MustClose()
|
|
bsr.indexReader.MustClose()
|
|
|
|
bsr.reset()
|
|
}
|
|
|
|
// Error returns the last error.
|
|
func (bsr *blockStreamReader) Error() error {
|
|
if bsr.err == nil || bsr.err == io.EOF {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("error when reading part %q: %w", bsr, bsr.err)
|
|
}
|
|
|
|
// NextBlock advances bsr to the next block.
|
|
func (bsr *blockStreamReader) NextBlock() bool {
|
|
if bsr.err != nil {
|
|
return false
|
|
}
|
|
bsr.tsidPrev = bsr.Block.bh.TSID
|
|
bsr.Block.Reset()
|
|
err := bsr.readBlock()
|
|
if err == nil {
|
|
if bsr.Block.bh.TSID.Less(&bsr.tsidPrev) {
|
|
bsr.err = fmt.Errorf("possible data corruption: the next TSID=%v is smaller than the previous TSID=%v", &bsr.Block.bh.TSID, &bsr.tsidPrev)
|
|
return false
|
|
}
|
|
if bsr.Block.bh.RowsCount == 0 {
|
|
bsr.err = fmt.Errorf("invalid block read with zero rows; block=%+v", &bsr.Block)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
if err == io.EOF {
|
|
bsr.err = io.EOF
|
|
return false
|
|
}
|
|
|
|
bsr.err = fmt.Errorf("cannot read next block: %w", err)
|
|
return false
|
|
}
|
|
|
|
func (bsr *blockStreamReader) readBlock() error {
|
|
if len(bsr.indexCursor) == 0 {
|
|
if bsr.mr != nil && bsr.indexBlockHeadersCount != bsr.mr.BlockHeadersCount {
|
|
return fmt.Errorf("invalid number of block headers in the previous index block at offset %d; got %d; want %d",
|
|
bsr.prevIndexBlockOffset(), bsr.indexBlockHeadersCount, bsr.mr.BlockHeadersCount)
|
|
}
|
|
bsr.indexBlockHeadersCount = 0
|
|
if err := bsr.readIndexBlock(); err != nil {
|
|
if err == io.EOF {
|
|
return io.EOF
|
|
}
|
|
return fmt.Errorf("cannot read index block: %w", err)
|
|
}
|
|
}
|
|
|
|
// Read block header.
|
|
if len(bsr.indexCursor) < marshaledBlockHeaderSize {
|
|
return fmt.Errorf("too short index data for reading block header at offset %d; got %d bytes; want %d bytes",
|
|
bsr.prevIndexBlockOffset(), len(bsr.indexCursor), marshaledBlockHeaderSize)
|
|
}
|
|
bsr.Block.headerData = append(bsr.Block.headerData[:0], bsr.indexCursor[:marshaledBlockHeaderSize]...)
|
|
bsr.indexCursor = bsr.indexCursor[marshaledBlockHeaderSize:]
|
|
tail, err := bsr.Block.bh.Unmarshal(bsr.Block.headerData)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse block header read from index data at offset %d: %w", bsr.prevIndexBlockOffset(), err)
|
|
}
|
|
if len(tail) > 0 {
|
|
return fmt.Errorf("non-empty tail left after parsing block header at offset %d: %x", bsr.prevIndexBlockOffset(), tail)
|
|
}
|
|
|
|
bsr.blocksCount++
|
|
if bsr.blocksCount > bsr.ph.BlocksCount {
|
|
return fmt.Errorf("too many blocks found in the block stream; got %d; cannot be bigger than %d", bsr.blocksCount, bsr.ph.BlocksCount)
|
|
}
|
|
|
|
// Validate block header.
|
|
bsr.rowsCount += uint64(bsr.Block.bh.RowsCount)
|
|
if bsr.rowsCount > bsr.ph.RowsCount {
|
|
return fmt.Errorf("too many rows found in the block stream; got %d; cannot be bigger than %d", bsr.rowsCount, bsr.ph.RowsCount)
|
|
}
|
|
if bsr.Block.bh.MinTimestamp < bsr.ph.MinTimestamp {
|
|
return fmt.Errorf("invalid MinTimestamp at block header at offset %d; got %d; cannot be smaller than %d",
|
|
bsr.prevIndexBlockOffset(), bsr.Block.bh.MinTimestamp, bsr.ph.MinTimestamp)
|
|
}
|
|
if bsr.Block.bh.MaxTimestamp > bsr.ph.MaxTimestamp {
|
|
return fmt.Errorf("invalid MaxTimestamp at block header at offset %d; got %d; cannot be bigger than %d",
|
|
bsr.prevIndexBlockOffset(), bsr.Block.bh.MaxTimestamp, bsr.ph.MaxTimestamp)
|
|
}
|
|
usePrevTimestamps := len(bsr.prevTimestampsData) > 0 && bsr.Block.bh.TimestampsBlockOffset == bsr.prevTimestampsBlockOffset
|
|
if usePrevTimestamps {
|
|
if int(bsr.Block.bh.TimestampsBlockSize) != len(bsr.prevTimestampsData) {
|
|
return fmt.Errorf("invalid TimestampsBlockSize at block header at offset %d; got %d; want %d",
|
|
bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockSize, len(bsr.prevTimestampsData))
|
|
}
|
|
} else if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset {
|
|
return fmt.Errorf("invalid TimestampsBlockOffset at block header at offset %d; got %d; want %d",
|
|
bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockOffset, bsr.timestampsBlockOffset)
|
|
}
|
|
if bsr.Block.bh.ValuesBlockOffset != bsr.valuesBlockOffset {
|
|
return fmt.Errorf("invalid ValuesBlockOffset at block header at offset %d; got %d; want %d",
|
|
bsr.prevIndexBlockOffset(), bsr.Block.bh.ValuesBlockOffset, bsr.valuesBlockOffset)
|
|
}
|
|
|
|
// Read timestamps data.
|
|
if usePrevTimestamps {
|
|
bsr.Block.timestampsData = append(bsr.Block.timestampsData[:0], bsr.prevTimestampsData...)
|
|
} else {
|
|
bsr.Block.timestampsData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
|
|
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
|
|
return fmt.Errorf("cannot read timestamps block at offset %d: %w", bsr.timestampsBlockOffset, err)
|
|
}
|
|
bsr.prevTimestampsBlockOffset = bsr.timestampsBlockOffset
|
|
bsr.prevTimestampsData = append(bsr.prevTimestampsData[:0], bsr.Block.timestampsData...)
|
|
}
|
|
|
|
// Read values data.
|
|
bsr.Block.valuesData = bytesutil.ResizeNoCopyMayOverallocate(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize))
|
|
if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil {
|
|
return fmt.Errorf("cannot read values block at offset %d: %w", bsr.valuesBlockOffset, err)
|
|
}
|
|
|
|
// Update offsets.
|
|
if !usePrevTimestamps {
|
|
bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize)
|
|
}
|
|
bsr.valuesBlockOffset += uint64(bsr.Block.bh.ValuesBlockSize)
|
|
bsr.indexBlockHeadersCount++
|
|
|
|
return nil
|
|
}
|
|
|
|
func (bsr *blockStreamReader) readIndexBlock() error {
|
|
// Go to the next metaindex row.
|
|
if len(bsr.mrs) == 0 {
|
|
return io.EOF
|
|
}
|
|
bsr.mr = &bsr.mrs[0]
|
|
bsr.mrs = bsr.mrs[1:]
|
|
|
|
// Validate metaindex row.
|
|
if bsr.indexBlockOffset != bsr.mr.IndexBlockOffset {
|
|
return fmt.Errorf("invalid IndexBlockOffset in metaindex row; got %d; want %d", bsr.mr.IndexBlockOffset, bsr.indexBlockOffset)
|
|
}
|
|
if bsr.mr.MinTimestamp < bsr.ph.MinTimestamp {
|
|
return fmt.Errorf("invalid MinTimesamp in metaindex row; got %d; cannot be smaller than %d", bsr.mr.MinTimestamp, bsr.ph.MinTimestamp)
|
|
}
|
|
if bsr.mr.MaxTimestamp > bsr.ph.MaxTimestamp {
|
|
return fmt.Errorf("invalid MaxTimestamp in metaindex row; got %d; cannot be bigger than %d", bsr.mr.MaxTimestamp, bsr.ph.MaxTimestamp)
|
|
}
|
|
|
|
// Read index block.
|
|
bsr.compressedIndexData = bytesutil.ResizeNoCopyMayOverallocate(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize))
|
|
if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil {
|
|
return fmt.Errorf("cannot read index block at offset %d: %w", bsr.indexBlockOffset, err)
|
|
}
|
|
tmpData, err := encoding.DecompressZSTD(bsr.indexData[:0], bsr.compressedIndexData)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot decompress index block at offset %d: %w", bsr.indexBlockOffset, err)
|
|
}
|
|
bsr.indexData = tmpData
|
|
bsr.indexCursor = bsr.indexData
|
|
|
|
// Update offsets.
|
|
bsr.indexBlockOffset += uint64(bsr.mr.IndexBlockSize)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (bsr *blockStreamReader) prevIndexBlockOffset() uint64 {
|
|
return bsr.indexBlockOffset - uint64(bsr.mr.IndexBlockSize)
|
|
}
|
|
|
|
func getBlockStreamReader() *blockStreamReader {
|
|
v := bsrPool.Get()
|
|
if v == nil {
|
|
return &blockStreamReader{}
|
|
}
|
|
return v.(*blockStreamReader)
|
|
}
|
|
|
|
func putBlockStreamReader(bsr *blockStreamReader) {
|
|
bsr.MustClose()
|
|
bsrPool.Put(bsr)
|
|
}
|
|
|
|
var bsrPool sync.Pool
|