mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/encoding: optimize UnmarshalVarUint64, UnmarshalVarInt64 and UnmarshalBytes a bit
Change the return values for these functions - now they return the unmarshaled result plus the size of the unmarshaled result in bytes, so the caller could re-slice the src for further unmarshaling. This improves performance of these functions in hot loops of VictoriaLogs a bit.
This commit is contained in:
parent
2e12119a9e
commit
32193b6059
21 changed files with 282 additions and 243 deletions
|
@ -231,12 +231,12 @@ func unmarshalInt64Array(dst []int64, src []byte, mt MarshalType, firstValue int
|
||||||
return dst, nil
|
return dst, nil
|
||||||
case MarshalTypeDeltaConst:
|
case MarshalTypeDeltaConst:
|
||||||
v := firstValue
|
v := firstValue
|
||||||
tail, d, err := UnmarshalVarInt64(src)
|
d, nLen := UnmarshalVarInt64(src)
|
||||||
if err != nil {
|
if nLen <= 0 {
|
||||||
return nil, fmt.Errorf("cannot unmarshal delta value for delta const: %w", err)
|
return nil, fmt.Errorf("cannot unmarshal delta value for delta const: %w", err)
|
||||||
}
|
}
|
||||||
if len(tail) > 0 {
|
if nLen < len(src) {
|
||||||
return nil, fmt.Errorf("unexpected trailing data after delta const (d=%d): %d bytes", d, len(tail))
|
return nil, fmt.Errorf("unexpected trailing data after delta const (d=%d): %d bytes", d, len(src)-nLen)
|
||||||
}
|
}
|
||||||
for itemsCount > 0 {
|
for itemsCount > 0 {
|
||||||
dst = append(dst, v)
|
dst = append(dst, v)
|
||||||
|
|
|
@ -167,16 +167,15 @@ func marshalVarInt64sSlow(dst []byte, vs []int64) []byte {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalVarInt64 returns unmarshaled int64 from src and returns the remaining tail from src.
|
// UnmarshalVarInt64 returns unmarshaled int64 from src and its size in bytes.
|
||||||
func UnmarshalVarInt64(src []byte) ([]byte, int64, error) {
|
//
|
||||||
|
// It returns 0 or negative value if it cannot unmarshal int64 from src.
|
||||||
|
func UnmarshalVarInt64(src []byte) (int64, int) {
|
||||||
// TODO substitute binary.Uvarint with binary.Varint when benchmark results will show it is faster.
|
// TODO substitute binary.Uvarint with binary.Varint when benchmark results will show it is faster.
|
||||||
// It is slower on amd64/linux Go1.22.
|
// It is slower on amd64/linux Go1.22.
|
||||||
u64, offset := binary.Uvarint(src)
|
u64, nSize := binary.Uvarint(src)
|
||||||
if offset <= 0 {
|
|
||||||
return src, 0, fmt.Errorf("cannot unmarshal varint")
|
|
||||||
}
|
|
||||||
i64 := int64(int64(u64>>1) ^ (int64(u64<<63) >> 63))
|
i64 := int64(int64(u64>>1) ^ (int64(u64<<63) >> 63))
|
||||||
return src[offset:], i64, nil
|
return i64, nSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalVarInt64s unmarshals len(dst) int64 values from src to dst and returns the remaining tail from src.
|
// UnmarshalVarInt64s unmarshals len(dst) int64 values from src to dst and returns the remaining tail from src.
|
||||||
|
@ -363,13 +362,27 @@ func marshalVarUint64sSlow(dst []byte, us []uint64) []byte {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalVarUint64 returns unmarshaled uint64 from src and returns the remaining tail from src.
|
// UnmarshalVarUint64 returns unmarshaled uint64 from src and its size in bytes.
|
||||||
func UnmarshalVarUint64(src []byte) ([]byte, uint64, error) {
|
//
|
||||||
u64, offset := binary.Uvarint(src)
|
// It returns 0 or negative value if it cannot unmarshal uint64 from src.
|
||||||
if offset <= 0 {
|
func UnmarshalVarUint64(src []byte) (uint64, int) {
|
||||||
return src, 0, fmt.Errorf("cannot read varuint")
|
if len(src) == 0 {
|
||||||
|
return 0, 0
|
||||||
}
|
}
|
||||||
return src[offset:], u64, nil
|
if src[0] < 0x80 {
|
||||||
|
// Fast path for a single byte
|
||||||
|
return uint64(src[0]), 1
|
||||||
|
}
|
||||||
|
if len(src) == 1 {
|
||||||
|
return 0, 0
|
||||||
|
}
|
||||||
|
if src[1] < 0x80 {
|
||||||
|
// Fast path for two bytes
|
||||||
|
return uint64(src[0]&0x7f) | uint64(src[1])<<7, 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path for other number of bytes
|
||||||
|
return binary.Uvarint(src)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalVarUint64s unmarshals len(dst) uint64 values from src to dst and returns the remaining tail from src.
|
// UnmarshalVarUint64s unmarshals len(dst) uint64 values from src to dst and returns the remaining tail from src.
|
||||||
|
@ -496,17 +509,20 @@ func MarshalBytes(dst, b []byte) []byte {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalBytes returns unmarshaled bytes from src.
|
// UnmarshalBytes returns unmarshaled bytes from src and the size of the unmarshaled bytes.
|
||||||
func UnmarshalBytes(src []byte) ([]byte, []byte, error) {
|
//
|
||||||
tail, n, err := UnmarshalVarUint64(src)
|
// It returns 0 or negative value if it is impossible to unmarshal bytes from src.
|
||||||
if err != nil {
|
func UnmarshalBytes(src []byte) ([]byte, int) {
|
||||||
return nil, nil, fmt.Errorf("cannot unmarshal string size: %w", err)
|
n, nSize := UnmarshalVarUint64(src)
|
||||||
|
if nSize <= 0 {
|
||||||
|
return nil, 0
|
||||||
}
|
}
|
||||||
src = tail
|
if uint64(nSize)+n > uint64(len(src)) {
|
||||||
if uint64(len(src)) < n {
|
return nil, 0
|
||||||
return nil, nil, fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", n, len(src))
|
|
||||||
}
|
}
|
||||||
return src[n:], src[:n], nil
|
start := nSize
|
||||||
|
nSize += int(n)
|
||||||
|
return src[start:nSize], nSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetInt64s returns an int64 slice with the given size.
|
// GetInt64s returns an int64 slice with the given size.
|
||||||
|
|
|
@ -224,10 +224,11 @@ func testMarshalUnmarshalVarInt64(t *testing.T, v int64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
b := MarshalVarInt64(nil, v)
|
b := MarshalVarInt64(nil, v)
|
||||||
tail, vNew, err := UnmarshalVarInt64(b)
|
vNew, nSize := UnmarshalVarInt64(b)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
t.Fatalf("unexpected error when unmarshaling v=%d from b=%x: %s", v, b, err)
|
t.Fatalf("unexpected error when unmarshaling v=%d from b=%x", v, b)
|
||||||
}
|
}
|
||||||
|
tail := b[nSize:]
|
||||||
if vNew != v {
|
if vNew != v {
|
||||||
t.Fatalf("unexpected vNew from b=%x; got %d; expecting %d", b, vNew, v)
|
t.Fatalf("unexpected vNew from b=%x; got %d; expecting %d", b, vNew, v)
|
||||||
}
|
}
|
||||||
|
@ -272,10 +273,11 @@ func testMarshalUnmarshalVarUint64(t *testing.T, u uint64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
b := MarshalVarUint64(nil, u)
|
b := MarshalVarUint64(nil, u)
|
||||||
tail, uNew, err := UnmarshalVarUint64(b)
|
uNew, nSize := UnmarshalVarUint64(b)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
t.Fatalf("unexpected error when unmarshaling u=%d from b=%x: %s", u, b, err)
|
t.Fatalf("unexpected error when unmarshaling u=%d from b=%x", u, b)
|
||||||
}
|
}
|
||||||
|
tail := b[nSize:]
|
||||||
if uNew != u {
|
if uNew != u {
|
||||||
t.Fatalf("unexpected uNew from b=%x; got %d; expecting %d", b, uNew, u)
|
t.Fatalf("unexpected uNew from b=%x; got %d; expecting %d", b, uNew, u)
|
||||||
}
|
}
|
||||||
|
@ -310,10 +312,11 @@ func testMarshalUnmarshalBytes(t *testing.T, s string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
b := MarshalBytes(nil, []byte(s))
|
b := MarshalBytes(nil, []byte(s))
|
||||||
tail, bNew, err := UnmarshalBytes(b)
|
bNew, nSize := UnmarshalBytes(b)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
t.Fatalf("unexpected error when unmarshaling s=%q from b=%x: %s", s, b, err)
|
t.Fatalf("unexpected error when unmarshaling s=%q from b=%x", s, b)
|
||||||
}
|
}
|
||||||
|
tail := b[nSize:]
|
||||||
if string(bNew) != s {
|
if string(bNew) != s {
|
||||||
t.Fatalf("unexpected sNew from b=%x; got %q; expecting %q", b, bNew, s)
|
t.Fatalf("unexpected sNew from b=%x; got %q; expecting %q", b, bNew, s)
|
||||||
}
|
}
|
||||||
|
|
|
@ -173,12 +173,12 @@ func benchmarkUnmarshalVarUint64(b *testing.B, maxValue uint64) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
src := data
|
src := data
|
||||||
for len(src) > 0 {
|
for len(src) > 0 {
|
||||||
tail, n, err := UnmarshalVarUint64(src)
|
n, nSize := UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
panic(fmt.Errorf("unexpected error: %w", err))
|
panic(fmt.Errorf("unexpected error"))
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
sink += n
|
sink += n
|
||||||
src = tail
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Sink.Add(sink)
|
Sink.Add(sink)
|
||||||
|
@ -265,12 +265,12 @@ func benchmarkUnmarshalVarInt64(b *testing.B, maxValue int64) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
src := data
|
src := data
|
||||||
for len(src) > 0 {
|
for len(src) > 0 {
|
||||||
tail, n, err := UnmarshalVarInt64(src)
|
n, nSize := UnmarshalVarInt64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
panic(fmt.Errorf("unexpected error: %w", err))
|
panic(fmt.Errorf("unexpected error"))
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
sink += uint64(n)
|
sink += uint64(n)
|
||||||
src = tail
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Sink.Add(sink)
|
Sink.Add(sink)
|
||||||
|
|
|
@ -85,23 +85,23 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) {
|
||||||
src = tail
|
src = tail
|
||||||
|
|
||||||
// unmarshal bh.uncompressedSizeBytes
|
// unmarshal bh.uncompressedSizeBytes
|
||||||
tail, n, err := encoding.UnmarshalVarUint64(src)
|
n, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal uncompressedSizeBytes")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
bh.uncompressedSizeBytes = n
|
bh.uncompressedSizeBytes = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
// unmarshal bh.rowsCount
|
// unmarshal bh.rowsCount
|
||||||
tail, n, err = encoding.UnmarshalVarUint64(src)
|
n, nSize = encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal rowsCount")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if n > maxRowsPerBlock {
|
if n > maxRowsPerBlock {
|
||||||
return srcOrig, fmt.Errorf("too big value for rowsCount: %d; mustn't exceed %d", n, maxRowsPerBlock)
|
return srcOrig, fmt.Errorf("too big value for rowsCount: %d; mustn't exceed %d", n, maxRowsPerBlock)
|
||||||
}
|
}
|
||||||
bh.rowsCount = n
|
bh.rowsCount = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
// unmarshal bh.timestampsHeader
|
// unmarshal bh.timestampsHeader
|
||||||
tail, err = bh.timestampsHeader.unmarshal(src)
|
tail, err = bh.timestampsHeader.unmarshal(src)
|
||||||
|
@ -111,23 +111,23 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) {
|
||||||
src = tail
|
src = tail
|
||||||
|
|
||||||
// unmarshal columnsHeaderOffset
|
// unmarshal columnsHeaderOffset
|
||||||
tail, n, err = encoding.UnmarshalVarUint64(src)
|
n, nSize = encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderOffset: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderOffset")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
bh.columnsHeaderOffset = n
|
bh.columnsHeaderOffset = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
// unmarshal columnsHeaderSize
|
// unmarshal columnsHeaderSize
|
||||||
tail, n, err = encoding.UnmarshalVarUint64(src)
|
n, nSize = encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderSize: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderSize")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if n > maxColumnsHeaderSize {
|
if n > maxColumnsHeaderSize {
|
||||||
return srcOrig, fmt.Errorf("too big value for columnsHeaderSize: %d; mustn't exceed %d", n, maxColumnsHeaderSize)
|
return srcOrig, fmt.Errorf("too big value for columnsHeaderSize: %d; mustn't exceed %d", n, maxColumnsHeaderSize)
|
||||||
}
|
}
|
||||||
bh.columnsHeaderSize = n
|
bh.columnsHeaderSize = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
@ -296,17 +296,18 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
|
||||||
csh.reset()
|
csh.reset()
|
||||||
|
|
||||||
// unmarshal columnHeaders
|
// unmarshal columnHeaders
|
||||||
tail, n, err := encoding.UnmarshalVarUint64(src)
|
n, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return fmt.Errorf("cannot unmarshal columnHeaders len: %w", err)
|
return fmt.Errorf("cannot unmarshal columnHeaders len")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if n > maxColumnsPerBlock {
|
if n > maxColumnsPerBlock {
|
||||||
return fmt.Errorf("too many column headers: %d; mustn't exceed %d", n, maxColumnsPerBlock)
|
return fmt.Errorf("too many column headers: %d; mustn't exceed %d", n, maxColumnsPerBlock)
|
||||||
}
|
}
|
||||||
src = tail
|
|
||||||
chs := csh.resizeColumnHeaders(int(n))
|
chs := csh.resizeColumnHeaders(int(n))
|
||||||
for i := range chs {
|
for i := range chs {
|
||||||
tail, err = chs[i].unmarshal(a, src)
|
tail, err := chs[i].unmarshal(a, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err)
|
return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err)
|
||||||
}
|
}
|
||||||
|
@ -315,17 +316,18 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
|
||||||
csh.columnHeaders = chs
|
csh.columnHeaders = chs
|
||||||
|
|
||||||
// unmarshal constColumns
|
// unmarshal constColumns
|
||||||
tail, n, err = encoding.UnmarshalVarUint64(src)
|
n, nSize = encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return fmt.Errorf("cannot unmarshal constColumns len: %w", err)
|
return fmt.Errorf("cannot unmarshal constColumns len")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if n+uint64(len(csh.columnHeaders)) > maxColumnsPerBlock {
|
if n+uint64(len(csh.columnHeaders)) > maxColumnsPerBlock {
|
||||||
return fmt.Errorf("too many columns: %d; mustn't exceed %d", n+uint64(len(csh.columnHeaders)), maxColumnsPerBlock)
|
return fmt.Errorf("too many columns: %d; mustn't exceed %d", n+uint64(len(csh.columnHeaders)), maxColumnsPerBlock)
|
||||||
}
|
}
|
||||||
src = tail
|
|
||||||
ccs := csh.resizeConstColumns(int(n))
|
ccs := csh.resizeConstColumns(int(n))
|
||||||
for i := range ccs {
|
for i := range ccs {
|
||||||
tail, err = ccs[i].unmarshal(a, src)
|
tail, err := ccs[i].unmarshal(a, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err)
|
return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err)
|
||||||
}
|
}
|
||||||
|
@ -504,12 +506,12 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
srcOrig := src
|
srcOrig := src
|
||||||
|
|
||||||
// Unmarshal column name
|
// Unmarshal column name
|
||||||
tail, data, err := encoding.UnmarshalBytes(src)
|
data, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal column name: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal column name")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
ch.name = a.copyBytesToString(data)
|
ch.name = a.copyBytesToString(data)
|
||||||
src = tail
|
|
||||||
|
|
||||||
// Unmarshal value type
|
// Unmarshal value type
|
||||||
if len(src) < 1 {
|
if len(src) < 1 {
|
||||||
|
@ -521,13 +523,13 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
// Unmarshal the rest of data depending on valueType
|
// Unmarshal the rest of data depending on valueType
|
||||||
switch ch.valueType {
|
switch ch.valueType {
|
||||||
case valueTypeString:
|
case valueTypeString:
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeString for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeString for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
src = tail
|
src = tail
|
||||||
case valueTypeDict:
|
case valueTypeDict:
|
||||||
tail, err = ch.valuesDict.unmarshal(a, src)
|
tail, err := ch.valuesDict.unmarshal(a, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -546,7 +548,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
ch.maxValue = uint64(src[1])
|
ch.maxValue = uint64(src[1])
|
||||||
src = src[2:]
|
src = src[2:]
|
||||||
|
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint8 for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint8 for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -559,7 +561,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
ch.maxValue = uint64(encoding.UnmarshalUint16(src[2:]))
|
ch.maxValue = uint64(encoding.UnmarshalUint16(src[2:]))
|
||||||
src = src[4:]
|
src = src[4:]
|
||||||
|
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint16 for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint16 for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -572,7 +574,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:]))
|
ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:]))
|
||||||
src = src[8:]
|
src = src[8:]
|
||||||
|
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint32 for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint32 for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -585,7 +587,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
ch.maxValue = encoding.UnmarshalUint64(src[8:])
|
ch.maxValue = encoding.UnmarshalUint64(src[8:])
|
||||||
src = src[16:]
|
src = src[16:]
|
||||||
|
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint64 for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint64 for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -599,7 +601,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
ch.maxValue = encoding.UnmarshalUint64(src[8:])
|
ch.maxValue = encoding.UnmarshalUint64(src[8:])
|
||||||
src = src[16:]
|
src = src[16:]
|
||||||
|
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeFloat64 for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeFloat64 for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -612,7 +614,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:]))
|
ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:]))
|
||||||
src = src[8:]
|
src = src[8:]
|
||||||
|
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeIPv4 for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeIPv4 for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -626,7 +628,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
ch.maxValue = encoding.UnmarshalUint64(src[8:])
|
ch.maxValue = encoding.UnmarshalUint64(src[8:])
|
||||||
src = src[16:]
|
src = src[16:]
|
||||||
|
|
||||||
tail, err = ch.unmarshalValuesAndBloomFilters(src)
|
tail, err := ch.unmarshalValuesAndBloomFilters(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeTimestampISO8601 for column %q: %w", ch.name, err)
|
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeTimestampISO8601 for column %q: %w", ch.name, err)
|
||||||
}
|
}
|
||||||
|
@ -659,22 +661,22 @@ func (ch *columnHeader) unmarshalValuesAndBloomFilters(src []byte) ([]byte, erro
|
||||||
func (ch *columnHeader) unmarshalValues(src []byte) ([]byte, error) {
|
func (ch *columnHeader) unmarshalValues(src []byte) ([]byte, error) {
|
||||||
srcOrig := src
|
srcOrig := src
|
||||||
|
|
||||||
tail, n, err := encoding.UnmarshalVarUint64(src)
|
n, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal valuesOffset: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal valuesOffset")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
ch.valuesOffset = n
|
ch.valuesOffset = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, n, err = encoding.UnmarshalVarUint64(src)
|
n, nSize = encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal valuesSize: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal valuesSize")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if n > maxValuesBlockSize {
|
if n > maxValuesBlockSize {
|
||||||
return srcOrig, fmt.Errorf("too big valuesSize: %d bytes; mustn't exceed %d bytes", n, maxValuesBlockSize)
|
return srcOrig, fmt.Errorf("too big valuesSize: %d bytes; mustn't exceed %d bytes", n, maxValuesBlockSize)
|
||||||
}
|
}
|
||||||
ch.valuesSize = n
|
ch.valuesSize = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
@ -682,22 +684,22 @@ func (ch *columnHeader) unmarshalValues(src []byte) ([]byte, error) {
|
||||||
func (ch *columnHeader) unmarshalBloomFilters(src []byte) ([]byte, error) {
|
func (ch *columnHeader) unmarshalBloomFilters(src []byte) ([]byte, error) {
|
||||||
srcOrig := src
|
srcOrig := src
|
||||||
|
|
||||||
tail, n, err := encoding.UnmarshalVarUint64(src)
|
n, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterOffset: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterOffset")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
ch.bloomFilterOffset = n
|
ch.bloomFilterOffset = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, n, err = encoding.UnmarshalVarUint64(src)
|
n, nSize = encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterSize: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterSize")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if n > maxBloomFilterBlockSize {
|
if n > maxBloomFilterBlockSize {
|
||||||
return srcOrig, fmt.Errorf("too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", n, maxBloomFilterBlockSize)
|
return srcOrig, fmt.Errorf("too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", n, maxBloomFilterBlockSize)
|
||||||
}
|
}
|
||||||
ch.bloomFilterSize = n
|
ch.bloomFilterSize = n
|
||||||
src = tail
|
|
||||||
|
|
||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -279,11 +279,11 @@ func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) {
|
||||||
// Compressed block
|
// Compressed block
|
||||||
|
|
||||||
// Read block length
|
// Read block length
|
||||||
tail, blockLen, err := encoding.UnmarshalVarUint64(src)
|
blockLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return dst, src, fmt.Errorf("cannot unmarshal compressed block size: %w", err)
|
return dst, src, fmt.Errorf("cannot unmarshal compressed block size")
|
||||||
}
|
}
|
||||||
src = tail
|
src = src[nSize:]
|
||||||
if uint64(len(src)) < blockLen {
|
if uint64(len(src)) < blockLen {
|
||||||
return dst, src, fmt.Errorf("cannot read compressed block with the size %d bytes from %d bytes", blockLen, len(src))
|
return dst, src, fmt.Errorf("cannot read compressed block with the size %d bytes from %d bytes", blockLen, len(src))
|
||||||
}
|
}
|
||||||
|
@ -292,6 +292,7 @@ func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) {
|
||||||
|
|
||||||
// Decompress the block
|
// Decompress the block
|
||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
|
var err error
|
||||||
bb.B, err = encoding.DecompressZSTD(bb.B[:0], compressedBlock)
|
bb.B, err = encoding.DecompressZSTD(bb.B[:0], compressedBlock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dst, src, fmt.Errorf("cannot decompress block: %w", err)
|
return dst, src, fmt.Errorf("cannot decompress block: %w", err)
|
||||||
|
|
|
@ -507,14 +507,14 @@ func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilte
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
// Cache hit - unpack streamIDs from data.
|
// Cache hit - unpack streamIDs from data.
|
||||||
tail, n, err := encoding.UnmarshalVarUint64(data)
|
n, nSize := encoding.UnmarshalVarUint64(data)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache: %s", err)
|
logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache")
|
||||||
}
|
}
|
||||||
src := tail
|
src := data[nSize:]
|
||||||
streamIDs := make([]streamID, n)
|
streamIDs := make([]streamID, n)
|
||||||
for i := uint64(0); i < n; i++ {
|
for i := uint64(0); i < n; i++ {
|
||||||
tail, err = streamIDs[i].unmarshal(src)
|
tail, err := streamIDs[i].unmarshal(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
|
logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -398,12 +398,12 @@ func (psp *pipeStatsProcessor) flush() error {
|
||||||
values = values[:0]
|
values = values[:0]
|
||||||
keyBuf := bytesutil.ToUnsafeBytes(key)
|
keyBuf := bytesutil.ToUnsafeBytes(key)
|
||||||
for len(keyBuf) > 0 {
|
for len(keyBuf) > 0 {
|
||||||
tail, v, err := encoding.UnmarshalBytes(keyBuf)
|
v, nSize := encoding.UnmarshalBytes(keyBuf)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q: %w", keyBuf, err)
|
logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q", keyBuf)
|
||||||
}
|
}
|
||||||
|
keyBuf = keyBuf[nSize:]
|
||||||
values = append(values, bytesutil.ToUnsafeString(v))
|
values = append(values, bytesutil.ToUnsafeString(v))
|
||||||
keyBuf = tail
|
|
||||||
}
|
}
|
||||||
if len(values) != len(byFields) {
|
if len(values) != len(byFields) {
|
||||||
logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields))
|
logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields))
|
||||||
|
|
|
@ -238,17 +238,17 @@ func (pup *pipeUniqProcessor) flush() error {
|
||||||
rowFields = rowFields[:0]
|
rowFields = rowFields[:0]
|
||||||
keyBuf := bytesutil.ToUnsafeBytes(k)
|
keyBuf := bytesutil.ToUnsafeBytes(k)
|
||||||
for len(keyBuf) > 0 {
|
for len(keyBuf) > 0 {
|
||||||
tail, name, err := encoding.UnmarshalBytes(keyBuf)
|
name, nSize := encoding.UnmarshalBytes(keyBuf)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: cannot unmarshal field name: %s", err)
|
logger.Panicf("BUG: cannot unmarshal field name")
|
||||||
}
|
}
|
||||||
keyBuf = tail
|
keyBuf = keyBuf[nSize:]
|
||||||
|
|
||||||
tail, value, err := encoding.UnmarshalBytes(keyBuf)
|
value, nSize := encoding.UnmarshalBytes(keyBuf)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: cannot unmarshal field value: %s", err)
|
logger.Panicf("BUG: cannot unmarshal field value")
|
||||||
}
|
}
|
||||||
keyBuf = tail
|
keyBuf = keyBuf[nSize:]
|
||||||
|
|
||||||
rowFields = append(rowFields, Field{
|
rowFields = append(rowFields, Field{
|
||||||
Name: bytesutil.ToUnsafeString(name),
|
Name: bytesutil.ToUnsafeString(name),
|
||||||
|
@ -269,11 +269,11 @@ func (pup *pipeUniqProcessor) flush() error {
|
||||||
keyBuf := bytesutil.ToUnsafeBytes(k)
|
keyBuf := bytesutil.ToUnsafeBytes(k)
|
||||||
fieldIdx := 0
|
fieldIdx := 0
|
||||||
for len(keyBuf) > 0 {
|
for len(keyBuf) > 0 {
|
||||||
tail, value, err := encoding.UnmarshalBytes(keyBuf)
|
value, nSize := encoding.UnmarshalBytes(keyBuf)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: cannot unmarshal field value: %s", err)
|
logger.Panicf("BUG: cannot unmarshal field value")
|
||||||
}
|
}
|
||||||
keyBuf = tail
|
keyBuf = keyBuf[nSize:]
|
||||||
|
|
||||||
rowFields = append(rowFields, Field{
|
rowFields = append(rowFields, Field{
|
||||||
Name: byFields[fieldIdx],
|
Name: byFields[fieldIdx],
|
||||||
|
|
|
@ -38,20 +38,20 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
srcOrig := src
|
srcOrig := src
|
||||||
|
|
||||||
// Unmarshal field name
|
// Unmarshal field name
|
||||||
tail, b, err := encoding.UnmarshalBytes(src)
|
b, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal field name: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal field name")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
f.Name = a.copyBytesToString(b)
|
f.Name = a.copyBytesToString(b)
|
||||||
src = tail
|
|
||||||
|
|
||||||
// Unmarshal field value
|
// Unmarshal field value
|
||||||
tail, b, err = encoding.UnmarshalBytes(src)
|
b, nSize = encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal field value: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal field value")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
f.Value = a.copyBytesToString(b)
|
f.Value = a.copyBytesToString(b)
|
||||||
src = tail
|
|
||||||
|
|
||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,23 +119,23 @@ func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error) {
|
||||||
|
|
||||||
srcOrig := src
|
srcOrig := src
|
||||||
|
|
||||||
tail, n, err := encoding.UnmarshalVarUint64(src)
|
n, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal tags len: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal tags len")
|
||||||
}
|
}
|
||||||
src = tail
|
src = src[nSize:]
|
||||||
for i := uint64(0); i < n; i++ {
|
for i := uint64(0); i < n; i++ {
|
||||||
tail, name, err := encoding.UnmarshalBytes(src)
|
name, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal tag name: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal tag name")
|
||||||
}
|
}
|
||||||
src = tail
|
src = src[nSize:]
|
||||||
|
|
||||||
tail, value, err := encoding.UnmarshalBytes(src)
|
value, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot unmarshal tag value: %w", err)
|
return srcOrig, fmt.Errorf("cannot unmarshal tag value")
|
||||||
}
|
}
|
||||||
src = tail
|
src = src[nSize:]
|
||||||
|
|
||||||
sName := bytesutil.ToUnsafeString(name)
|
sName := bytesutil.ToUnsafeString(name)
|
||||||
sValue := bytesutil.ToUnsafeString(value)
|
sValue := bytesutil.ToUnsafeString(value)
|
||||||
|
|
|
@ -1137,11 +1137,11 @@ func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) {
|
||||||
dictLen := int(src[0])
|
dictLen := int(src[0])
|
||||||
src = src[1:]
|
src = src[1:]
|
||||||
for i := 0; i < dictLen; i++ {
|
for i := 0; i < dictLen; i++ {
|
||||||
tail, data, err := encoding.UnmarshalBytes(src)
|
data, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err)
|
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict", i, dictLen)
|
||||||
}
|
}
|
||||||
src = tail
|
src = src[nSize:]
|
||||||
|
|
||||||
v := a.copyBytesToString(data)
|
v := a.copyBytesToString(data)
|
||||||
vd.values = append(vd.values, v)
|
vd.values = append(vd.values, v)
|
||||||
|
|
|
@ -77,20 +77,20 @@ func (bh *blockHeader) Marshal(dst []byte) []byte {
|
||||||
func (bh *blockHeader) UnmarshalNoCopy(src []byte) ([]byte, error) {
|
func (bh *blockHeader) UnmarshalNoCopy(src []byte) ([]byte, error) {
|
||||||
bh.noCopy = true
|
bh.noCopy = true
|
||||||
// Unmarshal commonPrefix
|
// Unmarshal commonPrefix
|
||||||
tail, cp, err := encoding.UnmarshalBytes(src)
|
cp, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal commonPrefix: %w", err)
|
return src, fmt.Errorf("cannot unmarshal commonPrefix")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
bh.commonPrefix = cp[:len(cp):len(cp)]
|
bh.commonPrefix = cp[:len(cp):len(cp)]
|
||||||
src = tail
|
|
||||||
|
|
||||||
// Unmarshal firstItem
|
// Unmarshal firstItem
|
||||||
tail, fi, err := encoding.UnmarshalBytes(src)
|
fi, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
|
return src, fmt.Errorf("cannot unmarshal firstItem")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
bh.firstItem = fi[:len(fi):len(fi)]
|
bh.firstItem = fi[:len(fi):len(fi)]
|
||||||
src = tail
|
|
||||||
|
|
||||||
// Unmarshal marshalType
|
// Unmarshal marshalType
|
||||||
if len(src) == 0 {
|
if len(src) == 0 {
|
||||||
|
|
|
@ -41,12 +41,12 @@ func (mr *metaindexRow) Marshal(dst []byte) []byte {
|
||||||
|
|
||||||
func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
|
func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
|
||||||
// Unmarshal firstItem
|
// Unmarshal firstItem
|
||||||
tail, fi, err := encoding.UnmarshalBytes(src)
|
fi, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
|
return src, fmt.Errorf("cannot unmarshal firstItem")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
mr.firstItem = append(mr.firstItem[:0], fi...)
|
mr.firstItem = append(mr.firstItem[:0], fi...)
|
||||||
src = tail
|
|
||||||
|
|
||||||
// Unmarshal blockHeadersCount
|
// Unmarshal blockHeadersCount
|
||||||
if len(src) < 4 {
|
if len(src) < 4 {
|
||||||
|
|
|
@ -91,10 +91,11 @@ func cloneLabel(label prompbmarshal.Label) prompbmarshal.Label {
|
||||||
//
|
//
|
||||||
// It is safe calling Decompress from concurrent goroutines.
|
// It is safe calling Decompress from concurrent goroutines.
|
||||||
func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []prompbmarshal.Label {
|
func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []prompbmarshal.Label {
|
||||||
tail, labelsLen, err := encoding.UnmarshalVarUint64(src)
|
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: cannot unmarshal labels length: %s", err)
|
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
|
||||||
}
|
}
|
||||||
|
tail := src[nSize:]
|
||||||
if labelsLen == 0 {
|
if labelsLen == 0 {
|
||||||
// fast path - nothing to decode
|
// fast path - nothing to decode
|
||||||
if len(tail) > 0 {
|
if len(tail) > 0 {
|
||||||
|
@ -104,6 +105,7 @@ func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []
|
||||||
}
|
}
|
||||||
|
|
||||||
a := encoding.GetUint64s(int(labelsLen))
|
a := encoding.GetUint64s(int(labelsLen))
|
||||||
|
var err error
|
||||||
tail, err = encoding.UnmarshalVarUint64s(a.A, tail)
|
tail, err = encoding.UnmarshalVarUint64s(a.A, tail)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err)
|
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err)
|
||||||
|
|
|
@ -369,15 +369,18 @@ func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return src, err
|
return src, err
|
||||||
}
|
}
|
||||||
src, timestampsData, err := encoding.UnmarshalBytes(src)
|
timestampsData, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return src, fmt.Errorf("cannot read timestampsData: %w", err)
|
return src, fmt.Errorf("cannot read timestampsData")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
b.timestampsData = append(b.timestampsData[:0], timestampsData...)
|
b.timestampsData = append(b.timestampsData[:0], timestampsData...)
|
||||||
src, valuesData, err := encoding.UnmarshalBytes(src)
|
|
||||||
if err != nil {
|
valuesData, nSize := encoding.UnmarshalBytes(src)
|
||||||
return src, fmt.Errorf("cannot read valuesData: %w", err)
|
if nSize <= 0 {
|
||||||
|
return src, fmt.Errorf("cannot read valuesData")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
b.valuesData = append(b.valuesData[:0], valuesData...)
|
b.valuesData = append(b.valuesData[:0], valuesData...)
|
||||||
|
|
||||||
if err := b.bh.validate(); err != nil {
|
if err := b.bh.validate(); err != nil {
|
||||||
|
|
|
@ -167,33 +167,42 @@ func (bh *blockHeader) marshalPortable(dst []byte) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) {
|
func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) {
|
||||||
src, minTimestamp, err := encoding.UnmarshalVarInt64(src)
|
minTimestamp, nSize := encoding.UnmarshalVarInt64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
|
return src, fmt.Errorf("cannot unmarshal firstTimestamp from varint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
bh.MinTimestamp = minTimestamp
|
bh.MinTimestamp = minTimestamp
|
||||||
src, maxTimestamp, err := encoding.UnmarshalVarInt64(src)
|
|
||||||
if err != nil {
|
maxTimestamp, nSize := encoding.UnmarshalVarInt64(src)
|
||||||
return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err)
|
if nSize <= 0 {
|
||||||
|
return src, fmt.Errorf("cannot unmarshal firstTimestamp rom varint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
bh.MaxTimestamp = maxTimestamp
|
bh.MaxTimestamp = maxTimestamp
|
||||||
src, firstValue, err := encoding.UnmarshalVarInt64(src)
|
|
||||||
if err != nil {
|
firstValue, nSize := encoding.UnmarshalVarInt64(src)
|
||||||
return src, fmt.Errorf("cannot unmarshal firstValue: %w", err)
|
if nSize <= 0 {
|
||||||
|
return src, fmt.Errorf("cannot unmarshal firstValue from varint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
bh.FirstValue = firstValue
|
bh.FirstValue = firstValue
|
||||||
src, rowsCount, err := encoding.UnmarshalVarUint64(src)
|
|
||||||
if err != nil {
|
rowsCount, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err)
|
if nSize <= 0 {
|
||||||
|
return src, fmt.Errorf("cannot unmarshal rowsCount from varuint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if rowsCount > math.MaxUint32 {
|
if rowsCount > math.MaxUint32 {
|
||||||
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32))
|
return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32))
|
||||||
}
|
}
|
||||||
bh.RowsCount = uint32(rowsCount)
|
bh.RowsCount = uint32(rowsCount)
|
||||||
src, scale, err := encoding.UnmarshalVarInt64(src)
|
|
||||||
if err != nil {
|
scale, nSize := encoding.UnmarshalVarInt64(src)
|
||||||
return src, fmt.Errorf("cannot unmarshal scale: %w", err)
|
if nSize <= 0 {
|
||||||
|
return src, fmt.Errorf("cannot unmarshal scale from varint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
if scale < math.MinInt16 {
|
if scale < math.MinInt16 {
|
||||||
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
|
return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16)
|
||||||
}
|
}
|
||||||
|
@ -204,6 +213,7 @@ func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) {
|
||||||
if len(src) < 1 {
|
if len(src) < 1 {
|
||||||
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
|
return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
bh.TimestampsMarshalType = encoding.MarshalType(src[0])
|
bh.TimestampsMarshalType = encoding.MarshalType(src[0])
|
||||||
src = src[1:]
|
src = src[1:]
|
||||||
if len(src) < 1 {
|
if len(src) < 1 {
|
||||||
|
|
|
@ -2170,11 +2170,11 @@ func removeCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tagKey = tagKey[1:]
|
tagKey = tagKey[1:]
|
||||||
var nameLen uint64
|
nameLen, nSize := encoding.UnmarshalVarUint64(tagKey)
|
||||||
tagKey, nameLen, err = encoding.UnmarshalVarUint64(tagKey)
|
if nSize <= 0 {
|
||||||
if err != nil {
|
logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q", tagKey)
|
||||||
logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q: %s", tagKey, err)
|
|
||||||
}
|
}
|
||||||
|
tagKey = tagKey[nSize:]
|
||||||
if nameLen == 0 {
|
if nameLen == 0 {
|
||||||
logger.Panicf("BUG: nameLen must be greater than 0")
|
logger.Panicf("BUG: nameLen must be greater than 0")
|
||||||
}
|
}
|
||||||
|
@ -2987,11 +2987,11 @@ func unmarshalCompositeTagKey(src []byte) ([]byte, []byte, error) {
|
||||||
return nil, nil, fmt.Errorf("missing composite tag key prefix in %q", src)
|
return nil, nil, fmt.Errorf("missing composite tag key prefix in %q", src)
|
||||||
}
|
}
|
||||||
src = src[1:]
|
src = src[1:]
|
||||||
tail, n, err := encoding.UnmarshalVarUint64(src)
|
n, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return nil, nil, fmt.Errorf("cannot unmarshal metric name length from composite tag key: %w", err)
|
return nil, nil, fmt.Errorf("cannot unmarshal metric name length from composite tag key")
|
||||||
}
|
}
|
||||||
src = tail
|
src = src[nSize:]
|
||||||
if uint64(len(src)) < n {
|
if uint64(len(src)) < n {
|
||||||
return nil, nil, fmt.Errorf("missing metric name with length %d in composite tag key %q", n, src)
|
return nil, nil, fmt.Errorf("missing metric name with length %d in composite tag key %q", n, src)
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,12 +88,12 @@ func MarshalBlock(dst []byte, b *Block) []byte {
|
||||||
// Unmarshal unmarshals MetricBlock from src
|
// Unmarshal unmarshals MetricBlock from src
|
||||||
func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
|
func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
|
||||||
mb.Block.Reset()
|
mb.Block.Reset()
|
||||||
tail, mn, err := encoding.UnmarshalBytes(src)
|
mn, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err)
|
return src, fmt.Errorf("cannot unmarshal MetricName")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
mb.MetricName = append(mb.MetricName[:0], mn...)
|
mb.MetricName = append(mb.MetricName[:0], mn...)
|
||||||
src = tail
|
|
||||||
|
|
||||||
return UnmarshalBlock(&mb.Block, src)
|
return UnmarshalBlock(&mb.Block, src)
|
||||||
}
|
}
|
||||||
|
@ -108,19 +108,19 @@ func UnmarshalBlock(dst *Block, src []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
src = tail
|
src = tail
|
||||||
|
|
||||||
tail, tds, err := encoding.UnmarshalBytes(src)
|
tds, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal timestampsData: %w", err)
|
return tail, fmt.Errorf("cannot unmarshal timestampsData")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
dst.timestampsData = append(dst.timestampsData[:0], tds...)
|
dst.timestampsData = append(dst.timestampsData[:0], tds...)
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, vd, err := encoding.UnmarshalBytes(src)
|
vd, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal valuesData: %w", err)
|
return tail, fmt.Errorf("cannot unmarshal valuesData")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
dst.valuesData = append(dst.valuesData[:0], vd...)
|
dst.valuesData = append(dst.valuesData[:0], vd...)
|
||||||
src = tail
|
|
||||||
|
|
||||||
return src, nil
|
return src, nil
|
||||||
}
|
}
|
||||||
|
@ -365,19 +365,19 @@ func (tf *TagFilter) Marshal(dst []byte) []byte {
|
||||||
|
|
||||||
// Unmarshal unmarshals tf from src and returns the tail.
|
// Unmarshal unmarshals tf from src and returns the tail.
|
||||||
func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) {
|
func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) {
|
||||||
tail, k, err := encoding.UnmarshalBytes(src)
|
k, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal Key: %w", err)
|
return src, fmt.Errorf("cannot unmarshal Key")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
tf.Key = append(tf.Key[:0], k...)
|
tf.Key = append(tf.Key[:0], k...)
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, v, err := encoding.UnmarshalBytes(src)
|
v, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal Value: %w", err)
|
return src, fmt.Errorf("cannot unmarshal Value")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
tf.Value = append(tf.Value[:0], v...)
|
tf.Value = append(tf.Value[:0], v...)
|
||||||
src = tail
|
|
||||||
|
|
||||||
if len(src) < 1 {
|
if len(src) < 1 {
|
||||||
return src, fmt.Errorf("cannot unmarshal IsNegative+IsRegexp from empty src")
|
return src, fmt.Errorf("cannot unmarshal IsNegative+IsRegexp from empty src")
|
||||||
|
@ -445,42 +445,42 @@ func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) {
|
||||||
if len(src) < 4 {
|
if len(src) < 4 {
|
||||||
return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
return src, fmt.Errorf("cannot unmarshal AccountID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||||
}
|
}
|
||||||
sq.AccountID = encoding.UnmarshalUint32(src)
|
|
||||||
src = src[4:]
|
src = src[4:]
|
||||||
|
sq.AccountID = encoding.UnmarshalUint32(src)
|
||||||
|
|
||||||
if len(src) < 4 {
|
if len(src) < 4 {
|
||||||
return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
return src, fmt.Errorf("cannot unmarshal ProjectID: too short src len: %d; must be at least %d bytes", len(src), 4)
|
||||||
}
|
}
|
||||||
sq.ProjectID = encoding.UnmarshalUint32(src)
|
|
||||||
src = src[4:]
|
src = src[4:]
|
||||||
|
sq.ProjectID = encoding.UnmarshalUint32(src)
|
||||||
|
|
||||||
tail, minTs, err := encoding.UnmarshalVarInt64(src)
|
minTs, nSize := encoding.UnmarshalVarInt64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return src, fmt.Errorf("cannot unmarshal MinTimestamp: %w", err)
|
return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
sq.MinTimestamp = minTs
|
sq.MinTimestamp = minTs
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, maxTs, err := encoding.UnmarshalVarInt64(src)
|
maxTs, nSize := encoding.UnmarshalVarInt64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return src, fmt.Errorf("cannot unmarshal MaxTimestamp: %w", err)
|
return src, fmt.Errorf("cannot unmarshal MaxTimestamp from varint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
sq.MaxTimestamp = maxTs
|
sq.MaxTimestamp = maxTs
|
||||||
src = tail
|
|
||||||
|
|
||||||
tail, tfssCount, err := encoding.UnmarshalVarUint64(src)
|
tfssCount, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return src, fmt.Errorf("cannot unmarshal the count of TagFilterss: %w", err)
|
return src, fmt.Errorf("cannot unmarshal the count of TagFilterss from uvarint")
|
||||||
}
|
}
|
||||||
|
src = src[nSize:]
|
||||||
sq.TagFilterss = slicesutil.SetLength(sq.TagFilterss, int(tfssCount))
|
sq.TagFilterss = slicesutil.SetLength(sq.TagFilterss, int(tfssCount))
|
||||||
src = tail
|
|
||||||
|
|
||||||
for i := 0; i < int(tfssCount); i++ {
|
for i := 0; i < int(tfssCount); i++ {
|
||||||
tail, tfsCount, err := encoding.UnmarshalVarUint64(src)
|
tfsCount, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return src, fmt.Errorf("cannot unmarshal the count of TagFilters: %w", err)
|
return src, fmt.Errorf("cannot unmarshal the count of TagFilters from uvarint")
|
||||||
}
|
}
|
||||||
src = tail
|
src = src[nSize:]
|
||||||
|
|
||||||
tagFilters := sq.TagFilterss[i]
|
tagFilters := sq.TagFilterss[i]
|
||||||
tagFilters = slicesutil.SetLength(tagFilters, int(tfsCount))
|
tagFilters = slicesutil.SetLength(tagFilters, int(tfsCount))
|
||||||
|
|
|
@ -1683,25 +1683,26 @@ func UnmarshalMetricRows(dst []MetricRow, src []byte, maxRows int) ([]MetricRow,
|
||||||
//
|
//
|
||||||
// mr refers to src, so it remains valid until src changes.
|
// mr refers to src, so it remains valid until src changes.
|
||||||
func (mr *MetricRow) UnmarshalX(src []byte) ([]byte, error) {
|
func (mr *MetricRow) UnmarshalX(src []byte) ([]byte, error) {
|
||||||
tail, metricNameRaw, err := encoding.UnmarshalBytes(src)
|
metricNameRaw, nSize := encoding.UnmarshalBytes(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err)
|
return src, fmt.Errorf("cannot unmarshal MetricName")
|
||||||
}
|
}
|
||||||
|
tail := src[nSize:]
|
||||||
mr.MetricNameRaw = metricNameRaw
|
mr.MetricNameRaw = metricNameRaw
|
||||||
|
|
||||||
if len(tail) < 8 {
|
if len(tail) < 8 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail))
|
return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail))
|
||||||
}
|
}
|
||||||
timestamp := encoding.UnmarshalUint64(tail)
|
timestamp := encoding.UnmarshalUint64(tail)
|
||||||
mr.Timestamp = int64(timestamp)
|
|
||||||
tail = tail[8:]
|
tail = tail[8:]
|
||||||
|
mr.Timestamp = int64(timestamp)
|
||||||
|
|
||||||
if len(tail) < 8 {
|
if len(tail) < 8 {
|
||||||
return tail, fmt.Errorf("cannot unmarshal Value: want %d bytes; have %d bytes", 8, len(tail))
|
return tail, fmt.Errorf("cannot unmarshal Value: want %d bytes; have %d bytes", 8, len(tail))
|
||||||
}
|
}
|
||||||
value := encoding.UnmarshalUint64(tail)
|
value := encoding.UnmarshalUint64(tail)
|
||||||
mr.Value = math.Float64frombits(value)
|
|
||||||
tail = tail[8:]
|
tail = tail[8:]
|
||||||
|
mr.Value = math.Float64frombits(value)
|
||||||
|
|
||||||
return tail, nil
|
return tail, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -867,22 +867,23 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab
|
||||||
|
|
||||||
func getOutputKey(key string) string {
|
func getOutputKey(key string) string {
|
||||||
src := bytesutil.ToUnsafeBytes(key)
|
src := bytesutil.ToUnsafeBytes(key)
|
||||||
tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src)
|
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err)
|
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint")
|
||||||
}
|
}
|
||||||
outputKey := tail[inputKeyLen:]
|
outputKey := src[inputKeyLen:]
|
||||||
return bytesutil.ToUnsafeString(outputKey)
|
return bytesutil.ToUnsafeString(outputKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getInputOutputKey(key string) (string, string) {
|
func getInputOutputKey(key string) (string, string) {
|
||||||
src := bytesutil.ToUnsafeBytes(key)
|
src := bytesutil.ToUnsafeBytes(key)
|
||||||
tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src)
|
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
|
||||||
if err != nil {
|
if nSize <= 0 {
|
||||||
logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err)
|
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint")
|
||||||
}
|
}
|
||||||
inputKey := tail[:inputKeyLen]
|
src = src[nSize:]
|
||||||
outputKey := tail[inputKeyLen:]
|
inputKey := src[:inputKeyLen]
|
||||||
|
outputKey := src[inputKeyLen:]
|
||||||
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue