This commit is contained in:
Aliaksandr Valialkin 2024-05-13 22:56:22 +02:00
parent aa21c9492e
commit f0c48e35d3
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 64 additions and 59 deletions

View file

@ -498,11 +498,11 @@ func MarshalBytes(dst, b []byte) []byte {
// UnmarshalBytes returns unmarshaled bytes from src.
func UnmarshalBytes(src []byte) ([]byte, []byte, error) {
tail, n, err := UnmarshalVarUint64(src)
if err != nil {
return nil, nil, fmt.Errorf("cannot unmarshal string size: %w", err)
n, nSize := binary.Uvarint(src)
if nSize <= 0 {
return nil, nil, fmt.Errorf("cannot unmarshal string size from uvarint")
}
src = tail
src = src[nSize:]
if uint64(len(src)) < n {
return nil, nil, fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", n, len(src))
}

View file

@ -1,6 +1,7 @@
package logstorage
import (
"encoding/binary"
"fmt"
"math"
"sync"
@ -85,23 +86,23 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) {
src = tail
// unmarshal bh.uncompressedSizeBytes
tail, n, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: %w", err)
n, nSize := binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal uncompressedSizeBytes from uvarint")
}
src = src[nSize:]
bh.uncompressedSizeBytes = n
src = tail
// unmarshal bh.rowsCount
tail, n, err = encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
n, nSize = binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal rowsCount from uvarint")
}
src = src[nSize:]
if n > maxRowsPerBlock {
return srcOrig, fmt.Errorf("too big value for rowsCount: %d; mustn't exceed %d", n, maxRowsPerBlock)
}
bh.rowsCount = n
src = tail
// unmarshal bh.timestampsHeader
tail, err = bh.timestampsHeader.unmarshal(src)
@ -111,23 +112,23 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) {
src = tail
// unmarshal columnsHeaderOffset
tail, n, err = encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderOffset: %w", err)
n, nSize = binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderOffset from uvarint")
}
src = src[nSize:]
bh.columnsHeaderOffset = n
src = tail
// unmarshal columnsHeaderSize
tail, n, err = encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderSize: %w", err)
n, nSize = binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderSize from uvarint")
}
src = src[nSize:]
if n > maxColumnsHeaderSize {
return srcOrig, fmt.Errorf("too big value for columnsHeaderSize: %d; mustn't exceed %d", n, maxColumnsHeaderSize)
}
bh.columnsHeaderSize = n
src = tail
return src, nil
}
@ -296,17 +297,17 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
csh.reset()
// unmarshal columnHeaders
tail, n, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return fmt.Errorf("cannot unmarshal columnHeaders len: %w", err)
n, nSize := binary.Uvarint(src)
if nSize <= 0 {
return fmt.Errorf("cannot unmarshal columnHeaders len from uvarint")
}
src = src[nSize:]
if n > maxColumnsPerBlock {
return fmt.Errorf("too many column headers: %d; mustn't exceed %d", n, maxColumnsPerBlock)
}
src = tail
chs := csh.resizeColumnHeaders(int(n))
for i := range chs {
tail, err = chs[i].unmarshal(a, src)
tail, err := chs[i].unmarshal(a, src)
if err != nil {
return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err)
}
@ -315,17 +316,17 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
csh.columnHeaders = chs
// unmarshal constColumns
tail, n, err = encoding.UnmarshalVarUint64(src)
if err != nil {
return fmt.Errorf("cannot unmarshal constColumns len: %w", err)
n, nSize = binary.Uvarint(src)
if nSize <= 0 {
return fmt.Errorf("cannot unmarshal constColumns len from uvarint")
}
src = src[nSize:]
if n+uint64(len(csh.columnHeaders)) > maxColumnsPerBlock {
return fmt.Errorf("too many columns: %d; mustn't exceed %d", n+uint64(len(csh.columnHeaders)), maxColumnsPerBlock)
}
src = tail
ccs := csh.resizeConstColumns(int(n))
for i := range ccs {
tail, err = ccs[i].unmarshal(a, src)
tail, err := ccs[i].unmarshal(a, src)
if err != nil {
return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err)
}
@ -659,22 +660,22 @@ func (ch *columnHeader) unmarshalValuesAndBloomFilters(src []byte) ([]byte, erro
func (ch *columnHeader) unmarshalValues(src []byte) ([]byte, error) {
srcOrig := src
tail, n, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal valuesOffset: %w", err)
n, nSize := binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal valuesOffset from uvarint")
}
src = src[nSize:]
ch.valuesOffset = n
src = tail
tail, n, err = encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal valuesSize: %w", err)
n, nSize = binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal valuesSize from uvarint")
}
src = src[nSize:]
if n > maxValuesBlockSize {
return srcOrig, fmt.Errorf("too big valuesSize: %d bytes; mustn't exceed %d bytes", n, maxValuesBlockSize)
}
ch.valuesSize = n
src = tail
return src, nil
}
@ -682,22 +683,22 @@ func (ch *columnHeader) unmarshalValues(src []byte) ([]byte, error) {
func (ch *columnHeader) unmarshalBloomFilters(src []byte) ([]byte, error) {
srcOrig := src
tail, n, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterOffset: %w", err)
n, nSize := binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterOffset from uvarint")
}
src = src[nSize:]
ch.bloomFilterOffset = n
src = tail
tail, n, err = encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterSize: %w", err)
n, nSize = binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterSize from uvarint")
}
src = src[nSize:]
if n > maxBloomFilterBlockSize {
return srcOrig, fmt.Errorf("too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", n, maxBloomFilterBlockSize)
}
ch.bloomFilterSize = n
src = tail
return src, nil
}

View file

@ -1,6 +1,7 @@
package logstorage
import (
"encoding/binary"
"fmt"
"sync"
@ -279,11 +280,11 @@ func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) {
// Compressed block
// Read block length
tail, blockLen, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return dst, src, fmt.Errorf("cannot unmarshal compressed block size: %w", err)
blockLen, nSize := binary.Uvarint(src)
if nSize <= 0 {
return dst, src, fmt.Errorf("cannot unmarshal compressed block size from uvarint")
}
src = tail
src = src[nSize:]
if uint64(len(src)) < blockLen {
return dst, src, fmt.Errorf("cannot read compressed block with the size %d bytes from %d bytes", blockLen, len(src))
}
@ -292,6 +293,7 @@ func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) {
// Decompress the block
bb := bbPool.Get()
var err error
bb.B, err = encoding.DecompressZSTD(bb.B[:0], compressedBlock)
if err != nil {
return dst, src, fmt.Errorf("cannot decompress block: %w", err)

View file

@ -2,6 +2,7 @@ package logstorage
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sort"
@ -507,14 +508,14 @@ func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilte
return nil, false
}
// Cache hit - unpack streamIDs from data.
tail, n, err := encoding.UnmarshalVarUint64(data)
if err != nil {
logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache: %s", err)
n, nSize := binary.Uvarint(data)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal the number of streamIDs from cache")
}
src := tail
src := data[nSize:]
streamIDs := make([]streamID, n)
for i := uint64(0); i < n; i++ {
tail, err = streamIDs[i].unmarshal(src)
tail, err := streamIDs[i].unmarshal(src)
if err != nil {
logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
}

View file

@ -2,6 +2,7 @@ package logstorage
import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"strconv"
@ -119,11 +120,11 @@ func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error) {
srcOrig := src
tail, n, err := encoding.UnmarshalVarUint64(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal tags len: %w", err)
n, nSize := binary.Uvarint(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal tags len from uvarint")
}
src = tail
src = src[nSize:]
for i := uint64(0); i < n; i++ {
tail, name, err := encoding.UnmarshalBytes(src)
if err != nil {