VictoriaMetrics/lib/logstorage/index_block_header.go

165 lines
5.6 KiB
Go
Raw Normal View History

package logstorage
import (
"fmt"
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// indexBlockHeader contains index information about multiple blocks.
//
// It allows locating the block by streamID and/or by time range.
type indexBlockHeader struct {
// streamID is the minimum streamID covered by the indexBlockHeader
streamID streamID
// minTimestamp is the mimumum timestamp seen across blocks covered by the indexBlockHeader
minTimestamp int64
// maxTimestamp is the maximum timestamp seen across blocks covered by the indexBlockHeader
maxTimestamp int64
// indexBlockOffset is an offset of the linked index block at indexFilename
indexBlockOffset uint64
// indexBlockSize is the size of the linked index block at indexFilename
indexBlockSize uint64
}
// reset resets ih for subsequent re-use.
func (ih *indexBlockHeader) reset() {
ih.streamID.reset()
ih.minTimestamp = 0
ih.maxTimestamp = 0
ih.indexBlockOffset = 0
ih.indexBlockSize = 0
}
// mustWriteIndexBlock writes data with the given additioanl args to sw and updates ih accordingly.
func (ih *indexBlockHeader) mustWriteIndexBlock(data []byte, sidFirst streamID, minTimestamp, maxTimestamp int64, sw *streamWriters) {
ih.streamID = sidFirst
ih.minTimestamp = minTimestamp
ih.maxTimestamp = maxTimestamp
bb := longTermBufPool.Get()
bb.B = encoding.CompressZSTDLevel(bb.B[:0], data, 1)
ih.indexBlockOffset = sw.indexWriter.bytesWritten
ih.indexBlockSize = uint64(len(bb.B))
sw.indexWriter.MustWrite(bb.B)
longTermBufPool.Put(bb)
}
// mustReadNextIndexBlock reads the next index block associated with ih from src, appends it to dst and returns the result.
func (ih *indexBlockHeader) mustReadNextIndexBlock(dst []byte, sr *streamReaders) []byte {
indexReader := &sr.indexReader
indexBlockSize := ih.indexBlockSize
if indexBlockSize > maxIndexBlockSize {
logger.Panicf("FATAL: %s: indexBlockHeader.indexBlockSize=%d cannot exceed %d bytes", indexReader.Path(), indexBlockSize, maxIndexBlockSize)
}
if ih.indexBlockOffset != indexReader.bytesRead {
logger.Panicf("FATAL: %s: indexBlockHeader.indexBlockOffset=%d must equal to %d", indexReader.Path(), ih.indexBlockOffset, indexReader.bytesRead)
}
bbCompressed := longTermBufPool.Get()
bbCompressed.B = bytesutil.ResizeNoCopyMayOverallocate(bbCompressed.B, int(indexBlockSize))
indexReader.MustReadFull(bbCompressed.B)
// Decompress bbCompressed to dst
var err error
dst, err = encoding.DecompressZSTD(dst, bbCompressed.B)
longTermBufPool.Put(bbCompressed)
if err != nil {
logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", indexReader.Path(), ih.indexBlockOffset, indexBlockSize, err)
}
return dst
}
// marshal appends marshaled ih to dst and returns the result.
func (ih *indexBlockHeader) marshal(dst []byte) []byte {
dst = ih.streamID.marshal(dst)
dst = encoding.MarshalUint64(dst, uint64(ih.minTimestamp))
dst = encoding.MarshalUint64(dst, uint64(ih.maxTimestamp))
dst = encoding.MarshalUint64(dst, ih.indexBlockOffset)
dst = encoding.MarshalUint64(dst, ih.indexBlockSize)
return dst
}
// unmarshal unmarshals ih from src and returns the tail left.
func (ih *indexBlockHeader) unmarshal(src []byte) ([]byte, error) {
srcOrig := src
// unmarshal ih.streamID
tail, err := ih.streamID.unmarshal(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal streamID: %w", err)
}
src = tail
// unmarshal the rest of indexBlockHeader fields
if len(src) < 32 {
return srcOrig, fmt.Errorf("cannot unmarshal indexBlockHeader from %d bytes; need at least 32 bytes", len(src))
}
ih.minTimestamp = int64(encoding.UnmarshalUint64(src))
ih.maxTimestamp = int64(encoding.UnmarshalUint64(src[8:]))
ih.indexBlockOffset = encoding.UnmarshalUint64(src[16:])
ih.indexBlockSize = encoding.UnmarshalUint64(src[24:])
return src[32:], nil
}
// mustReadIndexBlockHeaders reads indexBlockHeader entries from r, appends them to dst and returns the result.
func mustReadIndexBlockHeaders(dst []indexBlockHeader, r *readerWithStats) []indexBlockHeader {
data, err := io.ReadAll(r)
if err != nil {
logger.Panicf("FATAL: cannot read indexBlockHeader entries from %s: %s", r.Path(), err)
}
bb := longTermBufPool.Get()
bb.B, err = encoding.DecompressZSTD(bb.B[:0], data)
if err != nil {
logger.Panicf("FATAL: cannot decompress indexBlockHeader entries from %s: %s", r.Path(), err)
}
dst, err = unmarshalIndexBlockHeaders(dst, bb.B)
if len(bb.B) < 1024*1024 {
longTermBufPool.Put(bb)
}
if err != nil {
logger.Panicf("FATAL: cannot parse indexBlockHeader entries from %s: %s", r.Path(), err)
}
return dst
}
// unmarshalIndexBlockHeaders appends unmarshaled from src indexBlockHeader entries to dst and returns the result.
func unmarshalIndexBlockHeaders(dst []indexBlockHeader, src []byte) ([]indexBlockHeader, error) {
dstOrig := dst
for len(src) > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, indexBlockHeader{})
}
ih := &dst[len(dst)-1]
tail, err := ih.unmarshal(src)
if err != nil {
return dstOrig, fmt.Errorf("cannot unmarshal indexBlockHeader %d: %w", len(dst)-len(dstOrig), err)
}
src = tail
}
if err := validateIndexBlockHeaders(dst[len(dstOrig):]); err != nil {
return dstOrig, err
}
return dst, nil
}
func validateIndexBlockHeaders(ihs []indexBlockHeader) error {
for i := 1; i < len(ihs); i++ {
if ihs[i].streamID.less(&ihs[i-1].streamID) {
return fmt.Errorf("unexpected indexBlockHeader with smaller streamID=%s after bigger streamID=%s", &ihs[i].streamID, &ihs[i-1].streamID)
}
}
return nil
}