This commit is contained in:
Aliaksandr Valialkin 2024-05-13 23:44:44 +02:00
parent 812dfd9465
commit 435506b223
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
13 changed files with 95 additions and 88 deletions

View file

@ -489,17 +489,19 @@ func MarshalBytes(dst, b []byte) []byte {
return dst
}
// UnmarshalBytes returns unmarshaled bytes from src.
func UnmarshalBytes(src []byte) ([]byte, []byte, error) {
// UnmarshalBytes returns unmarshaled bytes from src and the size of the unmarshaled bytes.
//
// It returns 0 or negative value if it is impossible to unmarshal bytes from src.
func UnmarshalBytes(src []byte) ([]byte, int) {
n, nSize := UnmarshalVarUint64(src)
if nSize <= 0 {
return nil, nil, fmt.Errorf("cannot unmarshal string size from uvarint")
return nil, 0
}
src = src[nSize:]
if uint64(len(src)) < n {
return nil, nil, fmt.Errorf("src is too short for reading string with size %d; len(src)=%d", n, len(src))
return nil, 0
}
return src[n:], src[:n], nil
return src[:n], nSize + int(n)
}
// GetInt64s returns an int64 slice with the given size.

View file

@ -312,10 +312,11 @@ func testMarshalUnmarshalBytes(t *testing.T, s string) {
t.Helper()
b := MarshalBytes(nil, []byte(s))
tail, bNew, err := UnmarshalBytes(b)
if err != nil {
t.Fatalf("unexpected error when unmarshaling s=%q from b=%x: %s", s, b, err)
bNew, nSize := UnmarshalBytes(b)
if nSize <= 0 {
t.Fatalf("unexpected error when unmarshaling s=%q from b=%x", s, b)
}
tail := b[nSize:]
if string(bNew) != s {
t.Fatalf("unexpected sNew from b=%x; got %q; expecting %q", b, bNew, s)
}

View file

@ -504,12 +504,12 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
srcOrig := src
// Unmarshal column name
tail, data, err := encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal column name: %w", err)
data, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal column name")
}
src = src[nSize:]
ch.name = a.copyBytesToString(data)
src = tail
// Unmarshal value type
if len(src) < 1 {
@ -521,13 +521,13 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
// Unmarshal the rest of data depending on valueType
switch ch.valueType {
case valueTypeString:
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeString for column %q: %w", ch.name, err)
}
src = tail
case valueTypeDict:
tail, err = ch.valuesDict.unmarshal(a, src)
tail, err := ch.valuesDict.unmarshal(a, src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err)
}
@ -546,7 +546,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.maxValue = uint64(src[1])
src = src[2:]
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint8 for column %q: %w", ch.name, err)
}
@ -559,7 +559,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.maxValue = uint64(encoding.UnmarshalUint16(src[2:]))
src = src[4:]
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint16 for column %q: %w", ch.name, err)
}
@ -572,7 +572,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:]))
src = src[8:]
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint32 for column %q: %w", ch.name, err)
}
@ -585,7 +585,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.maxValue = encoding.UnmarshalUint64(src[8:])
src = src[16:]
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeUint64 for column %q: %w", ch.name, err)
}
@ -599,7 +599,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.maxValue = encoding.UnmarshalUint64(src[8:])
src = src[16:]
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeFloat64 for column %q: %w", ch.name, err)
}
@ -612,7 +612,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.maxValue = uint64(encoding.UnmarshalUint32(src[4:]))
src = src[8:]
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeIPv4 for column %q: %w", ch.name, err)
}
@ -626,7 +626,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.maxValue = encoding.UnmarshalUint64(src[8:])
src = src[16:]
tail, err = ch.unmarshalValuesAndBloomFilters(src)
tail, err := ch.unmarshalValuesAndBloomFilters(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal values and bloom filters at valueTypeTimestampISO8601 for column %q: %w", ch.name, err)
}

View file

@ -398,12 +398,12 @@ func (psp *pipeStatsProcessor) flush() error {
values = values[:0]
keyBuf := bytesutil.ToUnsafeBytes(key)
for len(keyBuf) > 0 {
tail, v, err := encoding.UnmarshalBytes(keyBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q: %w", keyBuf, err)
v, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q", keyBuf)
}
keyBuf = keyBuf[nSize:]
values = append(values, bytesutil.ToUnsafeString(v))
keyBuf = tail
}
if len(values) != len(byFields) {
logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields))

View file

@ -238,17 +238,17 @@ func (pup *pipeUniqProcessor) flush() error {
rowFields = rowFields[:0]
keyBuf := bytesutil.ToUnsafeBytes(k)
for len(keyBuf) > 0 {
tail, name, err := encoding.UnmarshalBytes(keyBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal field name: %s", err)
name, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal field name")
}
keyBuf = tail
keyBuf = keyBuf[nSize:]
tail, value, err := encoding.UnmarshalBytes(keyBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal field value: %s", err)
value, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal field value")
}
keyBuf = tail
keyBuf = keyBuf[nSize:]
rowFields = append(rowFields, Field{
Name: bytesutil.ToUnsafeString(name),
@ -269,11 +269,11 @@ func (pup *pipeUniqProcessor) flush() error {
keyBuf := bytesutil.ToUnsafeBytes(k)
fieldIdx := 0
for len(keyBuf) > 0 {
tail, value, err := encoding.UnmarshalBytes(keyBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal field value: %s", err)
value, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal field value")
}
keyBuf = tail
keyBuf = keyBuf[nSize:]
rowFields = append(rowFields, Field{
Name: byFields[fieldIdx],

View file

@ -38,20 +38,20 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
srcOrig := src
// Unmarshal field name
tail, b, err := encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal field name: %w", err)
b, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal field name")
}
src = src[nSize:]
f.Name = a.copyBytesToString(b)
src = tail
// Unmarshal field value
tail, b, err = encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal field value: %w", err)
b, nSize = encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal field value")
}
src = src[nSize:]
f.Value = a.copyBytesToString(b)
src = tail
return src, nil
}

View file

@ -125,17 +125,17 @@ func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error) {
}
src = src[nSize:]
for i := uint64(0); i < n; i++ {
tail, name, err := encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal tag name: %w", err)
name, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal tag name")
}
src = tail
src = src[nSize:]
tail, value, err := encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal tag value: %w", err)
value, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal tag value")
}
src = tail
src = src[nSize:]
sName := bytesutil.ToUnsafeString(name)
sValue := bytesutil.ToUnsafeString(value)

View file

@ -1145,11 +1145,11 @@ func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) {
dictLen := int(src[0])
src = src[1:]
for i := 0; i < dictLen; i++ {
tail, data, err := encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err)
data, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict", i, dictLen)
}
src = tail
src = src[nSize:]
v := a.copyBytesToString(data)
vd.values = append(vd.values, v)

View file

@ -77,20 +77,20 @@ func (bh *blockHeader) Marshal(dst []byte) []byte {
func (bh *blockHeader) UnmarshalNoCopy(src []byte) ([]byte, error) {
bh.noCopy = true
// Unmarshal commonPrefix
tail, cp, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal commonPrefix: %w", err)
cp, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal commonPrefix")
}
src = src[nSize:]
bh.commonPrefix = cp[:len(cp):len(cp)]
src = tail
// Unmarshal firstItem
tail, fi, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
fi, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal firstItem")
}
src = src[nSize:]
bh.firstItem = fi[:len(fi):len(fi)]
src = tail
// Unmarshal marshalType
if len(src) == 0 {

View file

@ -41,12 +41,12 @@ func (mr *metaindexRow) Marshal(dst []byte) []byte {
func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
// Unmarshal firstItem
tail, fi, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal firstItem: %w", err)
fi, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal firstItem")
}
src = src[nSize:]
mr.firstItem = append(mr.firstItem[:0], fi...)
src = tail
// Unmarshal blockHeadersCount
if len(src) < 4 {

View file

@ -369,15 +369,18 @@ func (b *Block) UnmarshalPortable(src []byte) ([]byte, error) {
if err != nil {
return src, err
}
src, timestampsData, err := encoding.UnmarshalBytes(src)
if err != nil {
return src, fmt.Errorf("cannot read timestampsData: %w", err)
timestampsData, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot read timestampsData")
}
src = src[nSize:]
b.timestampsData = append(b.timestampsData[:0], timestampsData...)
src, valuesData, err := encoding.UnmarshalBytes(src)
if err != nil {
return src, fmt.Errorf("cannot read valuesData: %w", err)
valuesData, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot read valuesData")
}
src = src[nSize:]
b.valuesData = append(b.valuesData[:0], valuesData...)
if err := b.bh.validate(); err != nil {

View file

@ -322,19 +322,19 @@ func (tf *TagFilter) Marshal(dst []byte) []byte {
// Unmarshal unmarshals tf from src and returns the tail.
func (tf *TagFilter) Unmarshal(src []byte) ([]byte, error) {
tail, k, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal Key: %w", err)
k, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal Key")
}
src = src[nSize:]
tf.Key = append(tf.Key[:0], k...)
src = tail
tail, v, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal Value: %w", err)
v, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal Value")
}
src = src[nSize:]
tf.Value = append(tf.Value[:0], v...)
src = tail
if len(src) < 1 {
return src, fmt.Errorf("cannot unmarshal IsNegative+IsRegexp from empty src")

View file

@ -1575,25 +1575,26 @@ func (mr *MetricRow) Marshal(dst []byte) []byte {
//
// mr refers to src, so it remains valid until src changes.
func (mr *MetricRow) UnmarshalX(src []byte) ([]byte, error) {
tail, metricNameRaw, err := encoding.UnmarshalBytes(src)
if err != nil {
return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err)
metricNameRaw, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 {
return src, fmt.Errorf("cannot unmarshal MetricName")
}
tail := src[nSize:]
mr.MetricNameRaw = metricNameRaw
if len(tail) < 8 {
return tail, fmt.Errorf("cannot unmarshal Timestamp: want %d bytes; have %d bytes", 8, len(tail))
}
timestamp := encoding.UnmarshalUint64(tail)
mr.Timestamp = int64(timestamp)
tail = tail[8:]
mr.Timestamp = int64(timestamp)
if len(tail) < 8 {
return tail, fmt.Errorf("cannot unmarshal Value: want %d bytes; have %d bytes", 8, len(tail))
}
value := encoding.UnmarshalUint64(tail)
mr.Value = math.Float64frombits(value)
tail = tail[8:]
mr.Value = math.Float64frombits(value)
return tail, nil
}