mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
7bb5f75a2a
- Move uniqueFields from rows to blockStreamMerger struct. This allows localizing all the references to uniqueFields inside blockStreamMerger.mustWriteBlock(), which should improve readability and maintainability of the code. - Remove logging of the event when blocks cannot be merged because they contain more than maxColumnsPerBlock, since the provided logging didn't provide the solution for the issue with too many columns. I couldn't figure out the proper solution, which could be helpful for end user, so decided to remove the logging until we find the solution. This commit also contains the following additional changes: - It truncates field names longer than 128 chars during logs ingestion. This should prevent from ingesting bogus field names. This also should prevent from too big columnsHeader blocks, which could negatively affect search query performance, since columnsHeader is read on every scan of the corresponding data block. - It limits the maximum length of const column value to 256. Longer values are stored in an ordinary columns. This helps limiting the size of columnsHeader blocks and improving search query performance by avoiding reading too long const columns on every scan of the corresponding data block. - It deduplicates columns with identical names during data ingestion and background merging. Previously it was possible to pass columns with duplicate names to block.mustInitFromRows(), and they were stored as is in the block. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4969
734 lines
18 KiB
Go
734 lines
18 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"math/bits"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// valueType is the type of values stored in every column block.
|
|
type valueType byte
|
|
|
|
const (
|
|
// valueTypeUnknown is used for determining whether the value type is unknown.
|
|
valueTypeUnknown = valueType(0)
|
|
|
|
// default encoding for column blocks. Strings are stored as is.
|
|
valueTypeString = valueType(1)
|
|
|
|
// column blocks with small number of unique values are encoded as dict.
|
|
valueTypeDict = valueType(2)
|
|
|
|
// uint values up to 2^8-1 are encoded into valueTypeUint8.
|
|
// Every value occupies a single byte.
|
|
valueTypeUint8 = valueType(3)
|
|
|
|
// uint values up to 2^16-1 are encoded into valueTypeUint16.
|
|
// Every value occupies 2 bytes.
|
|
valueTypeUint16 = valueType(4)
|
|
|
|
// uint values up to 2^31-1 are encoded into valueTypeUint32.
|
|
// Every value occupies 4 bytes.
|
|
valueTypeUint32 = valueType(5)
|
|
|
|
// uint values up to 2^64-1 are encoded into valueTypeUint64.
|
|
// Every value occupies 8 bytes.
|
|
valueTypeUint64 = valueType(6)
|
|
|
|
// floating-point values are encoded into valueTypeFloat64.
|
|
valueTypeFloat64 = valueType(7)
|
|
|
|
// column blocks with ipv4 addresses are encoded as 4-byte strings.
|
|
valueTypeIPv4 = valueType(8)
|
|
|
|
// column blocks with ISO8601 timestamps are encoded into valueTypeTimestampISO8601.
|
|
// These timestamps are commonly used by Logstash.
|
|
valueTypeTimestampISO8601 = valueType(9)
|
|
)
|
|
|
|
type valuesEncoder struct {
|
|
// buf contains data for values.
|
|
buf []byte
|
|
|
|
// values contains encoded values.
|
|
values []string
|
|
}
|
|
|
|
func (ve *valuesEncoder) reset() {
|
|
ve.buf = ve.buf[:0]
|
|
|
|
vs := ve.values
|
|
for i := range vs {
|
|
vs[i] = ""
|
|
}
|
|
ve.values = vs[:0]
|
|
}
|
|
|
|
// encode encodes values to ve.values and returns the encoded value type with min/max encoded values.
|
|
func (ve *valuesEncoder) encode(values []string, dict *valuesDict) (valueType, uint64, uint64) {
|
|
ve.reset()
|
|
|
|
if len(values) == 0 {
|
|
return valueTypeString, 0, 0
|
|
}
|
|
|
|
var vt valueType
|
|
var minValue, maxValue uint64
|
|
|
|
// Try dict encoding at first, since it gives the highest speedup during querying.
|
|
// It also usually gives the best compression, since every value is encoded as a single byte.
|
|
ve.buf, ve.values, vt = tryDictEncoding(ve.buf[:0], ve.values[:0], values, dict)
|
|
if vt != valueTypeUnknown {
|
|
return vt, 0, 0
|
|
}
|
|
|
|
ve.buf, ve.values, vt, minValue, maxValue = tryUintEncoding(ve.buf[:0], ve.values[:0], values)
|
|
if vt != valueTypeUnknown {
|
|
return vt, minValue, maxValue
|
|
}
|
|
|
|
ve.buf, ve.values, vt, minValue, maxValue = tryFloat64Encoding(ve.buf[:0], ve.values[:0], values)
|
|
if vt != valueTypeUnknown {
|
|
return vt, minValue, maxValue
|
|
}
|
|
|
|
ve.buf, ve.values, vt, minValue, maxValue = tryIPv4Encoding(ve.buf[:0], ve.values[:0], values)
|
|
if vt != valueTypeUnknown {
|
|
return vt, minValue, maxValue
|
|
}
|
|
|
|
ve.buf, ve.values, vt, minValue, maxValue = tryTimestampISO8601Encoding(ve.buf[:0], ve.values[:0], values)
|
|
if vt != valueTypeUnknown {
|
|
return vt, minValue, maxValue
|
|
}
|
|
|
|
// Fall back to default encoding, e.g. leave values as is.
|
|
ve.values = append(ve.values[:0], values...)
|
|
return valueTypeString, 0, 0
|
|
}
|
|
|
|
func getValuesEncoder() *valuesEncoder {
|
|
v := valuesEncoderPool.Get()
|
|
if v == nil {
|
|
return &valuesEncoder{}
|
|
}
|
|
return v.(*valuesEncoder)
|
|
}
|
|
|
|
func putValuesEncoder(ve *valuesEncoder) {
|
|
ve.reset()
|
|
valuesEncoderPool.Put(ve)
|
|
}
|
|
|
|
var valuesEncoderPool sync.Pool
|
|
|
|
type valuesDecoder struct {
|
|
buf []byte
|
|
}
|
|
|
|
func (vd *valuesDecoder) reset() {
|
|
vd.buf = vd.buf[:0]
|
|
}
|
|
|
|
// decodeInplace decodes values encoded with the given vt and the given dict inplace.
|
|
//
|
|
// the decoded values remain valid until vd.reset() is called.
|
|
func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valuesDict) error {
|
|
// do not reset vd.buf, since it may contain previously decoded data,
|
|
// which must be preserved until reset() call.
|
|
dstBuf := vd.buf
|
|
|
|
switch vt {
|
|
case valueTypeString:
|
|
// nothing to do - values are already decoded.
|
|
case valueTypeUint8:
|
|
for i, v := range values {
|
|
if len(v) != 1 {
|
|
return fmt.Errorf("unexpected value length for uint8; got %d; want 1", len(v))
|
|
}
|
|
n := uint64(v[0])
|
|
dstLen := len(dstBuf)
|
|
dstBuf = strconv.AppendUint(dstBuf, n, 10)
|
|
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
}
|
|
case valueTypeUint16:
|
|
for i, v := range values {
|
|
if len(v) != 2 {
|
|
return fmt.Errorf("unexpected value length for uint16; got %d; want 2", len(v))
|
|
}
|
|
b := bytesutil.ToUnsafeBytes(v)
|
|
n := uint64(encoding.UnmarshalUint16(b))
|
|
dstLen := len(dstBuf)
|
|
dstBuf = strconv.AppendUint(dstBuf, n, 10)
|
|
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
}
|
|
case valueTypeUint32:
|
|
for i, v := range values {
|
|
if len(v) != 4 {
|
|
return fmt.Errorf("unexpected value length for uint32; got %d; want 4", len(v))
|
|
}
|
|
b := bytesutil.ToUnsafeBytes(v)
|
|
n := uint64(encoding.UnmarshalUint32(b))
|
|
dstLen := len(dstBuf)
|
|
dstBuf = strconv.AppendUint(dstBuf, n, 10)
|
|
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
}
|
|
case valueTypeUint64:
|
|
for i, v := range values {
|
|
if len(v) != 8 {
|
|
return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v))
|
|
}
|
|
b := bytesutil.ToUnsafeBytes(v)
|
|
n := encoding.UnmarshalUint64(b)
|
|
dstLen := len(dstBuf)
|
|
dstBuf = strconv.AppendUint(dstBuf, n, 10)
|
|
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
}
|
|
case valueTypeDict:
|
|
dictValues := dict.values
|
|
for i, v := range values {
|
|
id := int(v[0])
|
|
if id >= len(dictValues) {
|
|
return fmt.Errorf("unexpected dictionary id: %d; it must be smaller than %d", id, len(dictValues))
|
|
}
|
|
values[i] = dictValues[id]
|
|
}
|
|
case valueTypeIPv4:
|
|
for i, v := range values {
|
|
if len(v) != 4 {
|
|
return fmt.Errorf("unexpected value length for ipv4; got %d; want 4", len(v))
|
|
}
|
|
dstLen := len(dstBuf)
|
|
dstBuf = toIPv4String(dstBuf, v)
|
|
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
}
|
|
case valueTypeTimestampISO8601:
|
|
for i, v := range values {
|
|
if len(v) != 8 {
|
|
return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v))
|
|
}
|
|
dstLen := len(dstBuf)
|
|
dstBuf = toTimestampISO8601String(dstBuf, v)
|
|
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
}
|
|
case valueTypeFloat64:
|
|
for i, v := range values {
|
|
if len(v) != 8 {
|
|
return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v))
|
|
}
|
|
dstLen := len(dstBuf)
|
|
dstBuf = toFloat64String(dstBuf, v)
|
|
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
}
|
|
default:
|
|
return fmt.Errorf("unknown valueType=%d", vt)
|
|
}
|
|
|
|
vd.buf = dstBuf
|
|
return nil
|
|
}
|
|
|
|
func toTimestampISO8601String(dst []byte, v string) []byte {
|
|
b := bytesutil.ToUnsafeBytes(v)
|
|
n := encoding.UnmarshalUint64(b)
|
|
t := time.Unix(0, int64(n)).UTC()
|
|
dst = t.AppendFormat(dst, iso8601Timestamp)
|
|
return dst
|
|
}
|
|
|
|
func toIPv4String(dst []byte, v string) []byte {
|
|
dst = strconv.AppendUint(dst, uint64(v[0]), 10)
|
|
dst = append(dst, '.')
|
|
dst = strconv.AppendUint(dst, uint64(v[1]), 10)
|
|
dst = append(dst, '.')
|
|
dst = strconv.AppendUint(dst, uint64(v[2]), 10)
|
|
dst = append(dst, '.')
|
|
dst = strconv.AppendUint(dst, uint64(v[3]), 10)
|
|
return dst
|
|
}
|
|
|
|
func toFloat64String(dst []byte, v string) []byte {
|
|
b := bytesutil.ToUnsafeBytes(v)
|
|
n := encoding.UnmarshalUint64(b)
|
|
f := math.Float64frombits(n)
|
|
dst = strconv.AppendFloat(dst, f, 'g', -1, 64)
|
|
return dst
|
|
}
|
|
|
|
func getValuesDecoder() *valuesDecoder {
|
|
v := valuesDecoderPool.Get()
|
|
if v == nil {
|
|
return &valuesDecoder{}
|
|
}
|
|
return v.(*valuesDecoder)
|
|
}
|
|
|
|
func putValuesDecoder(vd *valuesDecoder) {
|
|
vd.reset()
|
|
valuesDecoderPool.Put(vd)
|
|
}
|
|
|
|
var valuesDecoderPool sync.Pool
|
|
|
|
func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
|
|
u64s := encoding.GetUint64s(len(srcValues))
|
|
defer encoding.PutUint64s(u64s)
|
|
a := u64s.A
|
|
var minValue, maxValue uint64
|
|
for i, v := range srcValues {
|
|
n, ok := tryParseTimestampISO8601(v)
|
|
if !ok {
|
|
return dstBuf, dstValues, valueTypeUnknown, 0, 0
|
|
}
|
|
a[i] = n
|
|
if i == 0 || n < minValue {
|
|
minValue = n
|
|
}
|
|
if i == 0 || n > maxValue {
|
|
maxValue = n
|
|
}
|
|
}
|
|
for _, n := range a {
|
|
dstLen := len(dstBuf)
|
|
dstBuf = encoding.MarshalUint64(dstBuf, n)
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
return dstBuf, dstValues, valueTypeTimestampISO8601, minValue, maxValue
|
|
}
|
|
|
|
func tryParseTimestampISO8601(s string) (uint64, bool) {
|
|
// Do not parse timestamps with timezone, since they cannot be converted back
|
|
// to the same string representation in general case.
|
|
// This may break search.
|
|
if len(s) != len("2006-01-02T15:04:05.000Z") {
|
|
return 0, false
|
|
}
|
|
|
|
// Parse year
|
|
if s[len("YYYY")] != '-' {
|
|
return 0, false
|
|
}
|
|
yearStr := s[:len("YYYY")]
|
|
n, ok := tryParseUint64(yearStr)
|
|
if !ok || n > 3000 {
|
|
return 0, false
|
|
}
|
|
year := int(n)
|
|
s = s[len("YYYY")+1:]
|
|
|
|
// Parse month
|
|
if s[len("MM")] != '-' {
|
|
return 0, false
|
|
}
|
|
monthStr := s[:len("MM")]
|
|
n, ok = tryParseUint64(monthStr)
|
|
if !ok || n < 1 || n > 12 {
|
|
return 0, false
|
|
}
|
|
month := time.Month(n)
|
|
s = s[len("MM")+1:]
|
|
|
|
// Parse day
|
|
if s[len("DD")] != 'T' {
|
|
return 0, false
|
|
}
|
|
dayStr := s[:len("DD")]
|
|
n, ok = tryParseUint64(dayStr)
|
|
if !ok || n < 1 || n > 31 {
|
|
return 0, false
|
|
}
|
|
day := int(n)
|
|
s = s[len("DD")+1:]
|
|
|
|
// Parse hour
|
|
if s[len("HH")] != ':' {
|
|
return 0, false
|
|
}
|
|
hourStr := s[:len("HH")]
|
|
n, ok = tryParseUint64(hourStr)
|
|
if !ok || n > 23 {
|
|
return 0, false
|
|
}
|
|
hour := int(n)
|
|
s = s[len("HH")+1:]
|
|
|
|
// Parse minute
|
|
if s[len("MM")] != ':' {
|
|
return 0, false
|
|
}
|
|
minuteStr := s[:len("MM")]
|
|
n, ok = tryParseUint64(minuteStr)
|
|
if !ok || n > 59 {
|
|
return 0, false
|
|
}
|
|
minute := int(n)
|
|
s = s[len("MM")+1:]
|
|
|
|
// Parse second
|
|
if s[len("SS")] != '.' {
|
|
return 0, false
|
|
}
|
|
secondStr := s[:len("SS")]
|
|
n, ok = tryParseUint64(secondStr)
|
|
if !ok || n > 59 {
|
|
return 0, false
|
|
}
|
|
second := int(n)
|
|
s = s[len("SS")+1:]
|
|
|
|
// Parse millisecond
|
|
tzDelimiter := s[len("000")]
|
|
if tzDelimiter != 'Z' {
|
|
return 0, false
|
|
}
|
|
millisecondStr := s[:len("000")]
|
|
n, ok = tryParseUint64(millisecondStr)
|
|
if !ok || n > 999 {
|
|
return 0, false
|
|
}
|
|
millisecond := int(n)
|
|
s = s[len("000")+1:]
|
|
|
|
if len(s) != 0 {
|
|
return 0, false
|
|
}
|
|
|
|
t := time.Date(year, month, day, hour, minute, second, millisecond*1e6, time.UTC)
|
|
ts := t.UnixNano()
|
|
return uint64(ts), true
|
|
}
|
|
|
|
func tryParseUint64(s string) (uint64, bool) {
|
|
if len(s) == 0 || len(s) > 18 {
|
|
return 0, false
|
|
}
|
|
n := uint64(0)
|
|
for i := 0; i < len(s); i++ {
|
|
ch := s[i]
|
|
if ch < '0' || ch > '9' {
|
|
return 0, false
|
|
}
|
|
n *= 10
|
|
n += uint64(ch - '0')
|
|
}
|
|
return n, true
|
|
}
|
|
|
|
const iso8601Timestamp = "2006-01-02T15:04:05.000Z"
|
|
|
|
func tryIPv4Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
|
|
u32s := encoding.GetUint32s(len(srcValues))
|
|
defer encoding.PutUint32s(u32s)
|
|
a := u32s.A
|
|
var minValue, maxValue uint32
|
|
for i, v := range srcValues {
|
|
n, ok := tryParseIPv4(v)
|
|
if !ok {
|
|
return dstBuf, dstValues, valueTypeUnknown, 0, 0
|
|
}
|
|
a[i] = n
|
|
if i == 0 || n < minValue {
|
|
minValue = n
|
|
}
|
|
if i == 0 || n > maxValue {
|
|
maxValue = n
|
|
}
|
|
}
|
|
for _, n := range a {
|
|
dstLen := len(dstBuf)
|
|
dstBuf = encoding.MarshalUint32(dstBuf, n)
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
return dstBuf, dstValues, valueTypeIPv4, uint64(minValue), uint64(maxValue)
|
|
}
|
|
|
|
func tryParseIPv4(s string) (uint32, bool) {
|
|
if len(s) < len("1.1.1.1") || len(s) > len("255.255.255.255") || strings.Count(s, ".") != 3 {
|
|
// Fast path - the entry isn't IPv4
|
|
return 0, false
|
|
}
|
|
|
|
var octets [4]byte
|
|
var v uint64
|
|
var ok bool
|
|
|
|
// Parse octet 1
|
|
n := strings.IndexByte(s, '.')
|
|
if n <= 0 || n > 3 {
|
|
return 0, false
|
|
}
|
|
v, ok = tryParseUint64(s[:n])
|
|
if !ok || v > 255 {
|
|
return 0, false
|
|
}
|
|
octets[0] = byte(v)
|
|
s = s[n+1:]
|
|
|
|
// Parse octet 2
|
|
n = strings.IndexByte(s, '.')
|
|
if n <= 0 || n > 3 {
|
|
return 0, false
|
|
}
|
|
v, ok = tryParseUint64(s[:n])
|
|
if !ok || v > 255 {
|
|
return 0, false
|
|
}
|
|
octets[1] = byte(v)
|
|
s = s[n+1:]
|
|
|
|
// Parse octet 3
|
|
n = strings.IndexByte(s, '.')
|
|
if n <= 0 || n > 3 {
|
|
return 0, false
|
|
}
|
|
v, ok = tryParseUint64(s[:n])
|
|
if !ok || v > 255 {
|
|
return 0, false
|
|
}
|
|
octets[2] = byte(v)
|
|
s = s[n+1:]
|
|
|
|
// Parse octet 4
|
|
v, ok = tryParseUint64(s)
|
|
if !ok || v > 255 {
|
|
return 0, false
|
|
}
|
|
octets[3] = byte(v)
|
|
|
|
ipv4 := encoding.UnmarshalUint32(octets[:])
|
|
return ipv4, true
|
|
}
|
|
|
|
func tryFloat64Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
|
|
u64s := encoding.GetUint64s(len(srcValues))
|
|
defer encoding.PutUint64s(u64s)
|
|
a := u64s.A
|
|
var minValue, maxValue float64
|
|
for i, v := range srcValues {
|
|
f, ok := tryParseFloat64(v)
|
|
if !ok {
|
|
return dstBuf, dstValues, valueTypeUnknown, 0, 0
|
|
}
|
|
a[i] = math.Float64bits(f)
|
|
if i == 0 || f < minValue {
|
|
minValue = f
|
|
}
|
|
if i == 0 || f > maxValue {
|
|
maxValue = f
|
|
}
|
|
}
|
|
for _, n := range a {
|
|
dstLen := len(dstBuf)
|
|
dstBuf = encoding.MarshalUint64(dstBuf, n)
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
minValueU64 := math.Float64bits(minValue)
|
|
maxValueU64 := math.Float64bits(maxValue)
|
|
return dstBuf, dstValues, valueTypeFloat64, minValueU64, maxValueU64
|
|
}
|
|
|
|
func tryParseFloat64(s string) (float64, bool) {
|
|
if len(s) == 0 || len(s) > 20 {
|
|
return 0, false
|
|
}
|
|
// Allow only decimal digits, minus and a dot.
|
|
// Do not allows scientific notation (for example 1.23E+05),
|
|
// since it cannot be converted back to the same string form.
|
|
|
|
minus := s[0] == '-'
|
|
if minus {
|
|
s = s[1:]
|
|
}
|
|
n := strings.IndexByte(s, '.')
|
|
if n < 0 {
|
|
// fast path - there are no dots
|
|
n, ok := tryParseUint64(s)
|
|
if !ok {
|
|
return 0, false
|
|
}
|
|
f := float64(n)
|
|
if minus {
|
|
f = -f
|
|
}
|
|
return f, true
|
|
}
|
|
if n == 0 || n == len(s)-1 {
|
|
// Do not allow dots at the beginning and at the end of s,
|
|
// since they cannot be converted back to the same string form.
|
|
return 0, false
|
|
}
|
|
sInt := s[:n]
|
|
sFrac := s[n+1:]
|
|
nInt, ok := tryParseUint64(sInt)
|
|
if !ok {
|
|
return 0, false
|
|
}
|
|
nFrac, ok := tryParseUint64(sFrac)
|
|
if !ok {
|
|
return 0, false
|
|
}
|
|
f := math.FMA(float64(nFrac), math.Pow10(-len(sFrac)), float64(nInt))
|
|
if minus {
|
|
f = -f
|
|
}
|
|
return f, true
|
|
}
|
|
|
|
func tryUintEncoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
|
|
u64s := encoding.GetUint64s(len(srcValues))
|
|
defer encoding.PutUint64s(u64s)
|
|
a := u64s.A
|
|
var minValue, maxValue uint64
|
|
for i, v := range srcValues {
|
|
n, ok := tryParseUint64(v)
|
|
if !ok {
|
|
return dstBuf, dstValues, valueTypeUnknown, 0, 0
|
|
}
|
|
a[i] = n
|
|
if i == 0 || n < minValue {
|
|
minValue = n
|
|
}
|
|
if i == 0 || n > maxValue {
|
|
maxValue = n
|
|
}
|
|
}
|
|
|
|
minBitSize := bits.Len64(maxValue)
|
|
if minBitSize <= 8 {
|
|
for _, n := range a {
|
|
dstLen := len(dstBuf)
|
|
dstBuf = append(dstBuf, byte(n))
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
return dstBuf, dstValues, valueTypeUint8, minValue, maxValue
|
|
}
|
|
if minBitSize <= 16 {
|
|
for _, n := range a {
|
|
dstLen := len(dstBuf)
|
|
dstBuf = encoding.MarshalUint16(dstBuf, uint16(n))
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
return dstBuf, dstValues, valueTypeUint16, minValue, maxValue
|
|
}
|
|
if minBitSize <= 32 {
|
|
for _, n := range a {
|
|
dstLen := len(dstBuf)
|
|
dstBuf = encoding.MarshalUint32(dstBuf, uint32(n))
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
return dstBuf, dstValues, valueTypeUint32, minValue, maxValue
|
|
}
|
|
for _, n := range a {
|
|
dstLen := len(dstBuf)
|
|
dstBuf = encoding.MarshalUint64(dstBuf, n)
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
return dstBuf, dstValues, valueTypeUint64, minValue, maxValue
|
|
}
|
|
|
|
func tryDictEncoding(dstBuf []byte, dstValues, srcValues []string, dict *valuesDict) ([]byte, []string, valueType) {
|
|
dict.reset()
|
|
dstBufOrig := dstBuf
|
|
dstValuesOrig := dstValues
|
|
|
|
for _, v := range srcValues {
|
|
id, ok := dict.getOrAdd(v)
|
|
if !ok {
|
|
dict.reset()
|
|
return dstBufOrig, dstValuesOrig, valueTypeUnknown
|
|
}
|
|
dstLen := len(dstBuf)
|
|
dstBuf = append(dstBuf, id)
|
|
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
|
|
dstValues = append(dstValues, v)
|
|
}
|
|
return dstBuf, dstValues, valueTypeDict
|
|
}
|
|
|
|
type valuesDict struct {
|
|
values []string
|
|
}
|
|
|
|
func (vd *valuesDict) reset() {
|
|
vs := vd.values
|
|
for i := range vs {
|
|
vs[i] = ""
|
|
}
|
|
vd.values = vs[:0]
|
|
}
|
|
|
|
func (vd *valuesDict) copyFrom(src *valuesDict) {
|
|
vd.reset()
|
|
|
|
vd.values = append(vd.values[:0], src.values...)
|
|
}
|
|
|
|
func (vd *valuesDict) getOrAdd(k string) (byte, bool) {
|
|
if len(k) > maxDictSizeBytes {
|
|
return 0, false
|
|
}
|
|
vs := vd.values
|
|
dictSizeBytes := 0
|
|
for i, v := range vs {
|
|
if k == v {
|
|
return byte(i), true
|
|
}
|
|
dictSizeBytes += len(v)
|
|
}
|
|
if len(vs) >= maxDictLen || dictSizeBytes+len(k) > maxDictSizeBytes {
|
|
return 0, false
|
|
}
|
|
vs = append(vs, k)
|
|
vd.values = vs
|
|
|
|
return byte(len(vs) - 1), true
|
|
}
|
|
|
|
func (vd *valuesDict) marshal(dst []byte) []byte {
|
|
values := vd.values
|
|
if len(values) > maxDictLen {
|
|
logger.Panicf("BUG: valuesDict may contain max %d items; got %d items", maxDictLen, len(values))
|
|
}
|
|
dst = append(dst, byte(len(values)))
|
|
for _, v := range values {
|
|
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(v))
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) {
|
|
vd.reset()
|
|
|
|
srcOrig := src
|
|
if len(src) < 1 {
|
|
return srcOrig, fmt.Errorf("cannot umarshal dict len from 0 bytes; need at least 1 byte")
|
|
}
|
|
dictLen := int(src[0])
|
|
src = src[1:]
|
|
for i := 0; i < dictLen; i++ {
|
|
tail, data, err := encoding.UnmarshalBytes(src)
|
|
if err != nil {
|
|
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err)
|
|
}
|
|
src = tail
|
|
// Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod
|
|
v := string(data)
|
|
vd.values = append(vd.values, v)
|
|
}
|
|
return src, nil
|
|
}
|