diff --git a/lib/encoding/encoding.go b/lib/encoding/encoding.go index 89168529d..6effdb41c 100644 --- a/lib/encoding/encoding.go +++ b/lib/encoding/encoding.go @@ -231,12 +231,12 @@ func unmarshalInt64Array(dst []int64, src []byte, mt MarshalType, firstValue int return dst, nil case MarshalTypeDeltaConst: v := firstValue - tail, d, err := UnmarshalVarInt64(src) - if err != nil { + d, nLen := UnmarshalVarInt64(src) + if nLen <= 0 { return nil, fmt.Errorf("cannot unmarshal delta value for delta const: %w", err) } - if len(tail) > 0 { - return nil, fmt.Errorf("unexpected trailing data after delta const (d=%d): %d bytes", d, len(tail)) + if nLen < len(src) { + return nil, fmt.Errorf("unexpected trailing data after delta const (d=%d): %d bytes", d, len(src)-nLen) } for itemsCount > 0 { dst = append(dst, v) diff --git a/lib/encoding/int.go b/lib/encoding/int.go index aef274a8e..a0cd4a715 100644 --- a/lib/encoding/int.go +++ b/lib/encoding/int.go @@ -167,16 +167,15 @@ func marshalVarInt64sSlow(dst []byte, vs []int64) []byte { return dst } -// UnmarshalVarInt64 returns unmarshaled int64 from src and returns the remaining tail from src. -func UnmarshalVarInt64(src []byte) ([]byte, int64, error) { +// UnmarshalVarInt64 returns unmarshaled int64 from src and its size in bytes. +// +// It returns 0 or negative value if it cannot unmarshal int64 from src. +func UnmarshalVarInt64(src []byte) (int64, int) { // TODO substitute binary.Uvarint with binary.Varint when benchmark results will show it is faster. // It is slower on amd64/linux Go1.22. - u64, offset := binary.Uvarint(src) - if offset <= 0 { - return src, 0, fmt.Errorf("cannot unmarshal varint") - } + u64, nSize := binary.Uvarint(src) i64 := int64(int64(u64>>1) ^ (int64(u64<<63) >> 63)) - return src[offset:], i64, nil + return i64, nSize } // UnmarshalVarInt64s unmarshals len(dst) int64 values from src to dst and returns the remaining tail from src. @@ -363,13 +362,27 @@ func marshalVarUint64sSlow(dst []byte, us []uint64) []byte { return dst } -// UnmarshalVarUint64 returns unmarshaled uint64 from src and returns the remaining tail from src. -func UnmarshalVarUint64(src []byte) ([]byte, uint64, error) { - u64, offset := binary.Uvarint(src) - if offset <= 0 { - return src, 0, fmt.Errorf("cannot read varuint") +// UnmarshalVarUint64 returns unmarshaled uint64 from src and its size in bytes. +// +// It returns 0 or negative value if it cannot unmarshal uint64 from src. +func UnmarshalVarUint64(src []byte) (uint64, int) { + if len(src) == 0 { + return 0, 0 } - return src[offset:], u64, nil + if src[0] < 0x80 { + // Fast path for a single byte + return uint64(src[0]), 1 + } + if len(src) == 1 { + return 0, 0 + } + if src[1] < 0x80 { + // Fast path for two bytes + return uint64(src[0]&0x7f) | uint64(src[1])<<7, 2 + } + + // Slow path for other number of bytes + return binary.Uvarint(src) } // UnmarshalVarUint64s unmarshals len(dst) uint64 values from src to dst and returns the remaining tail from src. @@ -496,17 +509,20 @@ func MarshalBytes(dst, b []byte) []byte { return dst } -// UnmarshalBytes returns unmarshaled bytes from src. -func UnmarshalBytes(src []byte) ([]byte, []byte, error) { - tail, n, err := UnmarshalVarUint64(src) - if err != nil { - return nil, nil, fmt.Errorf("cannot unmarshal string size: %w", err) +// UnmarshalBytes returns unmarshaled bytes from src and the size of the unmarshaled bytes. +// +// It returns 0 or negative value if it is impossible to unmarshal bytes from src. +func UnmarshalBytes(src []byte) ([]byte, int) { + n, nSize := UnmarshalVarUint64(src) + if nSize <= 0 { + return nil, 0 } - src = tail - if uint64(len(src)) < n { - return nil, nil, fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", n, len(src)) + if uint64(nSize)+n > uint64(len(src)) { + return nil, 0 } - return src[n:], src[:n], nil + start := nSize + nSize += int(n) + return src[start:nSize], nSize } // GetInt64s returns an int64 slice with the given size. diff --git a/lib/encoding/int_test.go b/lib/encoding/int_test.go index d38bbe81d..f8b75c94c 100644 --- a/lib/encoding/int_test.go +++ b/lib/encoding/int_test.go @@ -224,10 +224,11 @@ func testMarshalUnmarshalVarInt64(t *testing.T, v int64) { t.Helper() b := MarshalVarInt64(nil, v) - tail, vNew, err := UnmarshalVarInt64(b) - if err != nil { - t.Fatalf("unexpected error when unmarshaling v=%d from b=%x: %s", v, b, err) + vNew, nSize := UnmarshalVarInt64(b) + if nSize <= 0 { + t.Fatalf("unexpected error when unmarshaling v=%d from b=%x", v, b) } + tail := b[nSize:] if vNew != v { t.Fatalf("unexpected vNew from b=%x; got %d; expecting %d", b, vNew, v) } @@ -272,10 +273,11 @@ func testMarshalUnmarshalVarUint64(t *testing.T, u uint64) { t.Helper() b := MarshalVarUint64(nil, u) - tail, uNew, err := UnmarshalVarUint64(b) - if err != nil { - t.Fatalf("unexpected error when unmarshaling u=%d from b=%x: %s", u, b, err) + uNew, nSize := UnmarshalVarUint64(b) + if nSize <= 0 { + t.Fatalf("unexpected error when unmarshaling u=%d from b=%x", u, b) } + tail := b[nSize:] if uNew != u { t.Fatalf("unexpected uNew from b=%x; got %d; expecting %d", b, uNew, u) } @@ -310,10 +312,11 @@ func testMarshalUnmarshalBytes(t *testing.T, s string) { t.Helper() b := MarshalBytes(nil, []byte(s)) - tail, bNew, err := UnmarshalBytes(b) - if err != nil { - t.Fatalf("unexpected error when unmarshaling s=%q from b=%x: %s", s, b, err) + bNew, nSize := UnmarshalBytes(b) + if nSize <= 0 { + t.Fatalf("unexpected error when unmarshaling s=%q from b=%x", s, b) } + tail := b[nSize:] if string(bNew) != s { t.Fatalf("unexpected sNew from b=%x; got %q; expecting %q", b, bNew, s) } diff --git a/lib/encoding/int_timing_test.go b/lib/encoding/int_timing_test.go index 481316e2e..602cf8486 100644 --- a/lib/encoding/int_timing_test.go +++ b/lib/encoding/int_timing_test.go @@ -173,12 +173,12 @@ func benchmarkUnmarshalVarUint64(b *testing.B, maxValue uint64) { for pb.Next() { src := data for len(src) > 0 { - tail, n, err := UnmarshalVarUint64(src) - if err != nil { - panic(fmt.Errorf("unexpected error: %w", err)) + n, nSize := UnmarshalVarUint64(src) + if nSize <= 0 { + panic(fmt.Errorf("unexpected error")) } + src = src[nSize:] sink += n - src = tail } } Sink.Add(sink) @@ -265,12 +265,12 @@ func benchmarkUnmarshalVarInt64(b *testing.B, maxValue int64) { for pb.Next() { src := data for len(src) > 0 { - tail, n, err := UnmarshalVarInt64(src) - if err != nil { - panic(fmt.Errorf("unexpected error: %w", err)) + n, nSize := UnmarshalVarInt64(src) + if nSize <= 0 { + panic(fmt.Errorf("unexpected error")) } + src = src[nSize:] sink += uint64(n) - src = tail } } Sink.Add(sink) diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index edd28fe9f..578cae535 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -85,23 +85,23 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) { src = tail // unmarshal bh.uncompressedSizeBytes - tail, n, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal uncompressedSizeBytes: %w", err) + n, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal uncompressedSizeBytes") } + src = src[nSize:] bh.uncompressedSizeBytes = n - src = tail // unmarshal bh.rowsCount - tail, n, err = encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal rowsCount: %w", err) + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal rowsCount") } + src = src[nSize:] if n > maxRowsPerBlock { return srcOrig, fmt.Errorf("too big value for rowsCount: %d; mustn't exceed %d", n, maxRowsPerBlock) } bh.rowsCount = n - src = tail // unmarshal bh.timestampsHeader tail, err = bh.timestampsHeader.unmarshal(src) @@ -111,23 +111,23 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) { src = tail // unmarshal columnsHeaderOffset - tail, n, err = encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderOffset: %w", err) + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderOffset") } + src = src[nSize:] bh.columnsHeaderOffset = n - src = tail // unmarshal columnsHeaderSize - tail, n, err = encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderSize: %w", err) + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderSize") } + src = src[nSize:] if n > maxColumnsHeaderSize { return srcOrig, fmt.Errorf("too big value for columnsHeaderSize: %d; mustn't exceed %d", n, maxColumnsHeaderSize) } bh.columnsHeaderSize = n - src = tail return src, nil } @@ -296,17 +296,18 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error { csh.reset() // unmarshal columnHeaders - tail, n, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return fmt.Errorf("cannot unmarshal columnHeaders len: %w", err) + n, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return fmt.Errorf("cannot unmarshal columnHeaders len") } + src = src[nSize:] if n > maxColumnsPerBlock { return fmt.Errorf("too many column headers: %d; mustn't exceed %d", n, maxColumnsPerBlock) } - src = tail + chs := csh.resizeColumnHeaders(int(n)) for i := range chs { - tail, err = chs[i].unmarshal(a, src) + tail, err := chs[i].unmarshal(a, src) if err != nil { return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err) } @@ -315,17 +316,18 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error { csh.columnHeaders = chs // unmarshal constColumns - tail, n, err = encoding.UnmarshalVarUint64(src) - if err != nil { - return fmt.Errorf("cannot unmarshal constColumns len: %w", err) + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return fmt.Errorf("cannot unmarshal constColumns len") } + src = src[nSize:] if n+uint64(len(csh.columnHeaders)) > maxColumnsPerBlock { return fmt.Errorf("too many columns: %d; mustn't exceed %d", n+uint64(len(csh.columnHeaders)), maxColumnsPerBlock) } - src = tail + ccs := csh.resizeConstColumns(int(n)) for i := range ccs { - tail, err = ccs[i].unmarshal(a, src) + tail, err := ccs[i].unmarshal(a, src) if err != nil { return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err) } @@ -504,12 +506,12 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { srcOrig := src // Unmarshal column name - tail, data, err := encoding.UnmarshalBytes(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal column name: %w", err) + data, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal column name") } + src = src[nSize:] ch.name = a.copyBytesToString(data) - src = tail // Unmarshal value type if len(src) < 1 { @@ -521,13 +523,13 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { // Unmarshal the rest of data depending on valueType switch ch.valueType { case valueTypeString: - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeString for column %q: %w", ch.name, err) } src = tail case valueTypeDict: - tail, err = ch.valuesDict.unmarshal(a, src) + tail, err := ch.valuesDict.unmarshal(a, src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err) } @@ -546,7 +548,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.maxValue = uint64(src[1]) src = src[2:] - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint8 for column %q: %w", ch.name, err) } @@ -559,7 +561,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.maxValue = uint64(encoding.UnmarshalUint16(src[2:])) src = src[4:] - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint16 for column %q: %w", ch.name, err) } @@ -572,7 +574,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:])) src = src[8:] - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint32 for column %q: %w", ch.name, err) } @@ -585,7 +587,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.maxValue = encoding.UnmarshalUint64(src[8:]) src = src[16:] - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint64 for column %q: %w", ch.name, err) } @@ -599,7 +601,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.maxValue = encoding.UnmarshalUint64(src[8:]) src = src[16:] - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeFloat64 for column %q: %w", ch.name, err) } @@ -612,7 +614,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:])) src = src[8:] - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeIPv4 for column %q: %w", ch.name, err) } @@ -626,7 +628,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.maxValue = encoding.UnmarshalUint64(src[8:]) src = src[16:] - tail, err = ch.unmarshalValuesAndBloomFilters(src) + tail, err := ch.unmarshalValuesAndBloomFilters(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeTimestampISO8601 for column %q: %w", ch.name, err) } @@ -659,22 +661,22 @@ func (ch *columnHeader) unmarshalValuesAndBloomFilters(src []byte) ([]byte, erro func (ch *columnHeader) unmarshalValues(src []byte) ([]byte, error) { srcOrig := src - tail, n, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal valuesOffset: %w", err) + n, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal valuesOffset") } + src = src[nSize:] ch.valuesOffset = n - src = tail - tail, n, err = encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal valuesSize: %w", err) + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal valuesSize") } + src = src[nSize:] if n > maxValuesBlockSize { return srcOrig, fmt.Errorf("too big valuesSize: %d bytes; mustn't exceed %d bytes", n, maxValuesBlockSize) } ch.valuesSize = n - src = tail return src, nil } @@ -682,22 +684,22 @@ func (ch *columnHeader) unmarshalValues(src []byte) ([]byte, error) { func (ch *columnHeader) unmarshalBloomFilters(src []byte) ([]byte, error) { srcOrig := src - tail, n, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterOffset: %w", err) + n, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterOffset") } + src = src[nSize:] ch.bloomFilterOffset = n - src = tail - tail, n, err = encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterSize: %w", err) + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal bloomFilterSize") } + src = src[nSize:] if n > maxBloomFilterBlockSize { return srcOrig, fmt.Errorf("too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", n, maxBloomFilterBlockSize) } ch.bloomFilterSize = n - src = tail return src, nil } diff --git a/lib/logstorage/encoding.go b/lib/logstorage/encoding.go index ccce895f7..083519957 100644 --- a/lib/logstorage/encoding.go +++ b/lib/logstorage/encoding.go @@ -279,11 +279,11 @@ func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) { // Compressed block // Read block length - tail, blockLen, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return dst, src, fmt.Errorf("cannot unmarshal compressed block size: %w", err) + blockLen, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return dst, src, fmt.Errorf("cannot unmarshal compressed block size") } - src = tail + src = src[nSize:] if uint64(len(src)) < blockLen { return dst, src, fmt.Errorf("cannot read compressed block with the size %d bytes from %d bytes", blockLen, len(src)) } @@ -292,6 +292,7 @@ func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) { // Decompress the block bb := bbPool.Get() + var err error bb.B, err = encoding.DecompressZSTD(bb.B[:0], compressedBlock) if err != nil { return dst, src, fmt.Errorf("cannot decompress block: %w", err) diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index e98afb244..dba66b3a1 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -507,14 +507,14 @@ func (idb *indexdb) loadStreamIDsFromCache(tenantIDs []TenantID, sf *StreamFilte return nil, false } // Cache hit - unpack streamIDs from data. - tail, n, err := encoding.UnmarshalVarUint64(data) - if err != nil { - logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache: %s", err) + n, nSize := encoding.UnmarshalVarUint64(data) + if nSize <= 0 { + logger.Panicf("BUG: unexpected error when unmarshaling the number of streamIDs from cache") } - src := tail + src := data[nSize:] streamIDs := make([]streamID, n) for i := uint64(0); i < n; i++ { - tail, err = streamIDs[i].unmarshal(src) + tail, err := streamIDs[i].unmarshal(src) if err != nil { logger.Panicf("BUG: unexpected error when unmarshaling streamID #%d: %s", i, err) } diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 88a0bf8d3..3a9d8980b 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -398,12 +398,12 @@ func (psp *pipeStatsProcessor) flush() error { values = values[:0] keyBuf := bytesutil.ToUnsafeBytes(key) for len(keyBuf) > 0 { - tail, v, err := encoding.UnmarshalBytes(keyBuf) - if err != nil { - logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q: %w", keyBuf, err) + v, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q", keyBuf) } + keyBuf = keyBuf[nSize:] values = append(values, bytesutil.ToUnsafeString(v)) - keyBuf = tail } if len(values) != len(byFields) { logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields)) diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 6c171b931..6cdd46fc7 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -238,17 +238,17 @@ func (pup *pipeUniqProcessor) flush() error { rowFields = rowFields[:0] keyBuf := bytesutil.ToUnsafeBytes(k) for len(keyBuf) > 0 { - tail, name, err := encoding.UnmarshalBytes(keyBuf) - if err != nil { - logger.Panicf("BUG: cannot unmarshal field name: %s", err) + name, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field name") } - keyBuf = tail + keyBuf = keyBuf[nSize:] - tail, value, err := encoding.UnmarshalBytes(keyBuf) - if err != nil { - logger.Panicf("BUG: cannot unmarshal field value: %s", err) + value, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field value") } - keyBuf = tail + keyBuf = keyBuf[nSize:] rowFields = append(rowFields, Field{ Name: bytesutil.ToUnsafeString(name), @@ -269,11 +269,11 @@ func (pup *pipeUniqProcessor) flush() error { keyBuf := bytesutil.ToUnsafeBytes(k) fieldIdx := 0 for len(keyBuf) > 0 { - tail, value, err := encoding.UnmarshalBytes(keyBuf) - if err != nil { - logger.Panicf("BUG: cannot unmarshal field value: %s", err) + value, nSize := encoding.UnmarshalBytes(keyBuf) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal field value") } - keyBuf = tail + keyBuf = keyBuf[nSize:] rowFields = append(rowFields, Field{ Name: byFields[fieldIdx], diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index 62ab3c53f..fe7c268fd 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -38,20 +38,20 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) { srcOrig := src // Unmarshal field name - tail, b, err := encoding.UnmarshalBytes(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal field name: %w", err) + b, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal field name") } + src = src[nSize:] f.Name = a.copyBytesToString(b) - src = tail // Unmarshal field value - tail, b, err = encoding.UnmarshalBytes(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal field value: %w", err) + b, nSize = encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal field value") } + src = src[nSize:] f.Value = a.copyBytesToString(b) - src = tail return src, nil } diff --git a/lib/logstorage/stream_tags.go b/lib/logstorage/stream_tags.go index b83567cc3..2e946bab6 100644 --- a/lib/logstorage/stream_tags.go +++ b/lib/logstorage/stream_tags.go @@ -119,23 +119,23 @@ func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error) { srcOrig := src - tail, n, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal tags len: %w", err) + n, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal tags len") } - src = tail + src = src[nSize:] for i := uint64(0); i < n; i++ { - tail, name, err := encoding.UnmarshalBytes(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal tag name: %w", err) + name, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal tag name") } - src = tail + src = src[nSize:] - tail, value, err := encoding.UnmarshalBytes(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot unmarshal tag value: %w", err) + value, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal tag value") } - src = tail + src = src[nSize:] sName := bytesutil.ToUnsafeString(name) sValue := bytesutil.ToUnsafeString(value) diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 131721b70..9b7da6069 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -1137,11 +1137,11 @@ func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) { dictLen := int(src[0]) src = src[1:] for i := 0; i < dictLen; i++ { - tail, data, err := encoding.UnmarshalBytes(src) - if err != nil { - return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err) + data, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict", i, dictLen) } - src = tail + src = src[nSize:] v := a.copyBytesToString(data) vd.values = append(vd.values, v) diff --git a/lib/mergeset/block_header.go b/lib/mergeset/block_header.go index 817fee7e6..a0fb1af2c 100644 --- a/lib/mergeset/block_header.go +++ b/lib/mergeset/block_header.go @@ -77,20 +77,20 @@ func (bh *blockHeader) Marshal(dst []byte) []byte { func (bh *blockHeader) UnmarshalNoCopy(src []byte) ([]byte, error) { bh.noCopy = true // Unmarshal commonPrefix - tail, cp, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal commonPrefix: %w", err) + cp, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal commonPrefix") } + src = src[nSize:] bh.commonPrefix = cp[:len(cp):len(cp)] - src = tail // Unmarshal firstItem - tail, fi, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err) + fi, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal firstItem") } + src = src[nSize:] bh.firstItem = fi[:len(fi):len(fi)] - src = tail // Unmarshal marshalType if len(src) == 0 { diff --git a/lib/mergeset/metaindex_row.go b/lib/mergeset/metaindex_row.go index e80ba19a4..6e3e02ad5 100644 --- a/lib/mergeset/metaindex_row.go +++ b/lib/mergeset/metaindex_row.go @@ -41,12 +41,12 @@ func (mr *metaindexRow) Marshal(dst []byte) []byte { func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) { // Unmarshal firstItem - tail, fi, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err) + fi, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal firstItem") } + src = src[nSize:] mr.firstItem = append(mr.firstItem[:0], fi...) - src = tail // Unmarshal blockHeadersCount if len(src) < 4 { diff --git a/lib/promutils/labelscompressor.go b/lib/promutils/labelscompressor.go index a135d8e92..391b8f3c5 100644 --- a/lib/promutils/labelscompressor.go +++ b/lib/promutils/labelscompressor.go @@ -91,10 +91,11 @@ func cloneLabel(label prompbmarshal.Label) prompbmarshal.Label { // // It is safe calling Decompress from concurrent goroutines. func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []prompbmarshal.Label { - tail, labelsLen, err := encoding.UnmarshalVarUint64(src) - if err != nil { - logger.Panicf("BUG: cannot unmarshal labels length: %s", err) + labelsLen, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal labels length from uvarint") } + tail := src[nSize:] if labelsLen == 0 { // fast path - nothing to decode if len(tail) > 0 { @@ -104,6 +105,7 @@ func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) [] } a := encoding.GetUint64s(int(labelsLen)) + var err error tail, err = encoding.UnmarshalVarUint64s(a.A, tail) if err != nil { logger.Panicf("BUG: cannot unmarshal label indexes: %s", err) diff --git a/lib/storage/block.go b/lib/storage/block.go index 43635f651..0aa4e0616 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -369,15 +369,18 @@ func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) { if err != nil { return src, err } - src, timestampsData, err := encoding.UnmarshalBytes(src) - if err != nil { - return src, fmt.Errorf("cannot read timestampsData: %w", err) + timestampsData, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot read timestampsData") } + src = src[nSize:] b.timestampsData = append(b.timestampsData[:0], timestampsData...) - src, valuesData, err := encoding.UnmarshalBytes(src) - if err != nil { - return src, fmt.Errorf("cannot read valuesData: %w", err) + + valuesData, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot read valuesData") } + src = src[nSize:] b.valuesData = append(b.valuesData[:0], valuesData...) if err := b.bh.validate(); err != nil { diff --git a/lib/storage/block_header.go b/lib/storage/block_header.go index f7689f5f1..293411464 100644 --- a/lib/storage/block_header.go +++ b/lib/storage/block_header.go @@ -167,33 +167,42 @@ func (bh *blockHeader) marshalPortable(dst []byte) []byte { } func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) { - src, minTimestamp, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err) + minTimestamp, nSize := encoding.UnmarshalVarInt64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal firstTimestamp from varint") } + src = src[nSize:] bh.MinTimestamp = minTimestamp - src, maxTimestamp, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal firstTimestamp: %w", err) + + maxTimestamp, nSize := encoding.UnmarshalVarInt64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal firstTimestamp rom varint") } + src = src[nSize:] bh.MaxTimestamp = maxTimestamp - src, firstValue, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal firstValue: %w", err) + + firstValue, nSize := encoding.UnmarshalVarInt64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal firstValue from varint") } + src = src[nSize:] bh.FirstValue = firstValue - src, rowsCount, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal rowsCount: %w", err) + + rowsCount, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal rowsCount from varuint") } + src = src[nSize:] if rowsCount > math.MaxUint32 { return src, fmt.Errorf("got too big rowsCount=%d; it mustn't exceed %d", rowsCount, uint32(math.MaxUint32)) } bh.RowsCount = uint32(rowsCount) - src, scale, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal scale: %w", err) + + scale, nSize := encoding.UnmarshalVarInt64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal scale from varint") } + src = src[nSize:] if scale < math.MinInt16 { return src, fmt.Errorf("got too small scale=%d; it mustn't be smaller than %d", scale, math.MinInt16) } @@ -204,6 +213,7 @@ func (bh *blockHeader) unmarshalPortable(src []byte) ([]byte, error) { if len(src) < 1 { return src, fmt.Errorf("cannot unmarshal marshalType for timestamps from %d bytes; need at least %d bytes", len(src), 1) } + bh.TimestampsMarshalType = encoding.MarshalType(src[0]) src = src[1:] if len(src) < 1 { diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index af698e5bd..848f90cb1 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -2015,11 +2015,11 @@ func removeCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter { continue } tagKey = tagKey[1:] - var nameLen uint64 - tagKey, nameLen, err = encoding.UnmarshalVarUint64(tagKey) - if err != nil { - logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q: %s", tagKey, err) + nameLen, nSize := encoding.UnmarshalVarUint64(tagKey) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q", tagKey) } + tagKey = tagKey[nSize:] if nameLen == 0 { logger.Panicf("BUG: nameLen must be greater than 0") } @@ -2830,11 +2830,11 @@ func unmarshalCompositeTagKey(src []byte) ([]byte, []byte, error) { return nil, nil, fmt.Errorf("missing composite tag key prefix in %q", src) } src = src[1:] - tail, n, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return nil, nil, fmt.Errorf("cannot unmarshal metric name length from composite tag key: %w", err) + n, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return nil, nil, fmt.Errorf("cannot unmarshal metric name length from composite tag key") } - src = tail + src = src[nSize:] if uint64(len(src)) < n { return nil, nil, fmt.Errorf("missing metric name with length %d in composite tag key %q", n, src) } diff --git a/lib/storage/search.go b/lib/storage/search.go index 0e406b344..bd7d87982 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -322,19 +322,19 @@ func (tf *TagFilter) Marshal(dst []byte) []byte { // Unmarshal unmarshals tf from src and returns the tail. func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) { - tail, k, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal Key: %w", err) + k, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal Key") } + src = src[nSize:] tf.Key = append(tf.Key[:0], k...) - src = tail - tail, v, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal Value: %w", err) + v, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal Value") } + src = src[nSize:] tf.Value = append(tf.Value[:0], v...) - src = tail if len(src) < 1 { return src, fmt.Errorf("cannot unmarshal IsNegative+IsRegexp from empty src") @@ -396,33 +396,33 @@ func (sq *SearchQuery) Marshal(dst []byte) []byte { // Unmarshal unmarshals sq from src and returns the tail. func (sq *SearchQuery) Unmarshal(src []byte) ([]byte, error) { - tail, minTs, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal MinTimestamp: %w", err) + minTs, nSize := encoding.UnmarshalVarInt64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal MinTimestamp from varint") } + src = src[nSize:] sq.MinTimestamp = minTs - src = tail - tail, maxTs, err := encoding.UnmarshalVarInt64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal MaxTimestamp: %w", err) + maxTs, nSize := encoding.UnmarshalVarInt64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal MaxTimestamp from varint") } + src = src[nSize:] sq.MaxTimestamp = maxTs - src = tail - tail, tfssCount, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal the count of TagFilterss: %w", err) + tfssCount, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal the count of TagFilterss from uvarint") } + src = src[nSize:] sq.TagFilterss = slicesutil.SetLength(sq.TagFilterss, int(tfssCount)) - src = tail for i := 0; i < int(tfssCount); i++ { - tail, tfsCount, err := encoding.UnmarshalVarUint64(src) - if err != nil { - return src, fmt.Errorf("cannot unmarshal the count of TagFilters: %w", err) + tfsCount, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal the count of TagFilters from uvarint") } - src = tail + src = src[nSize:] tagFilters := sq.TagFilterss[i] tagFilters = slicesutil.SetLength(tagFilters, int(tfsCount)) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index c3f050f2c..bc3d4a288 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1575,25 +1575,26 @@ func (mr *MetricRow) Marshal(dst []byte) []byte { // // mr refers to src, so it remains valid until src changes. func (mr *MetricRow) UnmarshalX(src []byte) ([]byte, error) { - tail, metricNameRaw, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err) + metricNameRaw, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return src, fmt.Errorf("cannot unmarshal MetricName") } + tail := src[nSize:] mr.MetricNameRaw = metricNameRaw if len(tail) < 8 { return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail)) } timestamp := encoding.UnmarshalUint64(tail) - mr.Timestamp = int64(timestamp) tail = tail[8:] + mr.Timestamp = int64(timestamp) if len(tail) < 8 { return tail, fmt.Errorf("cannot unmarshal Value: want %d bytes; have %d bytes", 8, len(tail)) } value := encoding.UnmarshalUint64(tail) - mr.Value = math.Float64frombits(value) tail = tail[8:] + mr.Value = math.Float64frombits(value) return tail, nil } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 261967853..f7110290e 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -867,22 +867,23 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab func getOutputKey(key string) string { src := bytesutil.ToUnsafeBytes(key) - tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src) - if err != nil { - logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err) + inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") } - outputKey := tail[inputKeyLen:] + outputKey := src[inputKeyLen:] return bytesutil.ToUnsafeString(outputKey) } func getInputOutputKey(key string) (string, string) { src := bytesutil.ToUnsafeBytes(key) - tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src) - if err != nil { - logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err) + inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") } - inputKey := tail[:inputKeyLen] - outputKey := tail[inputKeyLen:] + src = src[nSize:] + inputKey := src[:inputKeyLen] + outputKey := src[inputKeyLen:] return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) }