lib/storage: correctly use maxBlockSize in various checks

Previously `maxBlockSize` has been multiplied by 8 in certain checks. This is unnecessary.
This commit is contained in:
Aliaksandr Valialkin 2020-09-24 18:12:09 +03:00
parent c584aece38
commit 24ca30bf66
5 changed files with 38 additions and 30 deletions

View file

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -9,11 +10,11 @@ import (
) )
const ( const (
// The maximum size of values in the block.
maxBlockSize = 64 * 1024
// The maximum number of rows per block. // The maximum number of rows per block.
maxRowsPerBlock = 8 * 1024 maxRowsPerBlock = 8 * 1024
// The maximum size of values in the block.
maxBlockSize = 8 * maxRowsPerBlock
) )
// Block represents a block of time series values for a single TSID. // Block represents a block of time series values for a single TSID.
@ -259,7 +260,7 @@ func (b *Block) UnmarshalData() error {
} }
if b.bh.RowsCount <= 0 { if b.bh.RowsCount <= 0 {
logger.Panicf("BUG: RowsCount must be greater than 0; got %d", b.bh.RowsCount) return fmt.Errorf("RowsCount must be greater than 0; got %d", b.bh.RowsCount)
} }
var err error var err error
@ -281,7 +282,7 @@ func (b *Block) UnmarshalData() error {
b.valuesData = b.valuesData[:0] b.valuesData = b.valuesData[:0]
if len(b.timestamps) != len(b.values) { if len(b.timestamps) != len(b.values) {
logger.Panicf("BUG: timestamps and values count mismatch; got %d vs %d", len(b.timestamps), len(b.values)) return fmt.Errorf("timestamps and values count mismatch; got %d vs %d", len(b.timestamps), len(b.values))
} }
b.nextIdx = 0 b.nextIdx = 0

View file

@ -150,26 +150,33 @@ func (bh *blockHeader) Unmarshal(src []byte) ([]byte, error) {
bh.PrecisionBits = uint8(src[0]) bh.PrecisionBits = uint8(src[0])
src = src[1:] src = src[1:]
if bh.RowsCount == 0 { err = bh.validate()
return src, fmt.Errorf("RowsCount in block header cannot be zero") return src, err
} }
if err = encoding.CheckMarshalType(bh.TimestampsMarshalType); err != nil {
return src, fmt.Errorf("unsupported TimestampsMarshalType: %w", err)
}
if err = encoding.CheckMarshalType(bh.ValuesMarshalType); err != nil {
return src, fmt.Errorf("unsupported ValuesMarshalType: %w", err)
}
if err = encoding.CheckPrecisionBits(bh.PrecisionBits); err != nil {
return src, err
}
if bh.TimestampsBlockSize > 2*8*maxBlockSize {
return src, fmt.Errorf("too big TimestampsBlockSize; got %d; cannot exceed %d", bh.TimestampsBlockSize, 2*8*maxBlockSize)
}
if bh.ValuesBlockSize > 2*8*maxBlockSize {
return src, fmt.Errorf("too big ValuesBlockSize; got %d; cannot exceed %d", bh.ValuesBlockSize, 2*8*maxBlockSize)
}
return src, nil func (bh *blockHeader) validate() error {
if bh.RowsCount == 0 {
return fmt.Errorf("RowsCount in block header cannot be zero")
}
if bh.RowsCount > 2*maxRowsPerBlock {
return fmt.Errorf("too big RowsCount; got %d; cannot exceed %d", bh.RowsCount, 2*maxRowsPerBlock)
}
if err := encoding.CheckMarshalType(bh.TimestampsMarshalType); err != nil {
return fmt.Errorf("unsupported TimestampsMarshalType: %w", err)
}
if err := encoding.CheckMarshalType(bh.ValuesMarshalType); err != nil {
return fmt.Errorf("unsupported ValuesMarshalType: %w", err)
}
if err := encoding.CheckPrecisionBits(bh.PrecisionBits); err != nil {
return err
}
if bh.TimestampsBlockSize > 2*maxBlockSize {
return fmt.Errorf("too big TimestampsBlockSize; got %d; cannot exceed %d", bh.TimestampsBlockSize, 2*maxBlockSize)
}
if bh.ValuesBlockSize > 2*maxBlockSize {
return fmt.Errorf("too big ValuesBlockSize; got %d; cannot exceed %d", bh.ValuesBlockSize, 2*maxBlockSize)
}
return nil
} }
// unmarshalBlockHeaders unmarshals all the block headers from src, // unmarshalBlockHeaders unmarshals all the block headers from src,

View file

@ -25,8 +25,8 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
bh.MaxTimestamp = int64(i*2e3 + 3) bh.MaxTimestamp = int64(i*2e3 + 3)
bh.TimestampsBlockOffset = uint64(i*12345 + 4) bh.TimestampsBlockOffset = uint64(i*12345 + 4)
bh.ValuesBlockOffset = uint64(i*3243 + 5) bh.ValuesBlockOffset = uint64(i*3243 + 5)
bh.TimestampsBlockSize = uint32(i*892 + 6) bh.TimestampsBlockSize = uint32((i*892 + 6) % maxBlockSize)
bh.ValuesBlockSize = uint32(i*894 + 7) bh.ValuesBlockSize = uint32((i*894 + 7) % maxBlockSize)
bh.RowsCount = uint32(i*3 + 8) bh.RowsCount = uint32(i*3 + 8)
bh.Scale = int16(i - 434 + 9) bh.Scale = int16(i - 434 + 9)
bh.TimestampsMarshalType = encoding.MarshalType((i + 10) % 7) bh.TimestampsMarshalType = encoding.MarshalType((i + 10) % 7)

View file

@ -120,8 +120,8 @@ func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
if mr.BlockHeadersCount <= 0 { if mr.BlockHeadersCount <= 0 {
return src, fmt.Errorf("BlockHeadersCount must be greater than 0") return src, fmt.Errorf("BlockHeadersCount must be greater than 0")
} }
if mr.IndexBlockSize > 2*8*maxBlockSize { if mr.IndexBlockSize > 2*maxBlockSize {
return src, fmt.Errorf("too big IndexBlockSize; got %d; cannot exceed %d", mr.IndexBlockSize, 2*8*maxBlockSize) return src, fmt.Errorf("too big IndexBlockSize; got %d; cannot exceed %d", mr.IndexBlockSize, 2*maxBlockSize)
} }
return src, nil return src, nil

View file

@ -91,8 +91,8 @@ func initTestMetaindexRow(mr *metaindexRow) {
if mr.BlockHeadersCount == 0 { if mr.BlockHeadersCount == 0 {
mr.BlockHeadersCount = 1 mr.BlockHeadersCount = 1
} }
if mr.IndexBlockSize > 2*8*maxBlockSize { if mr.IndexBlockSize > 2*maxBlockSize {
mr.IndexBlockSize = 2 * 8 * maxBlockSize mr.IndexBlockSize = 2 * maxBlockSize
} }
} }