VictoriaMetrics/lib/logstorage/values_encoder.go
Aliaksandr Valialkin 7bb5f75a2a
lib/logstorage: follow-up for 94627113db
- Move uniqueFields from rows to blockStreamMerger struct.
  This allows localizing all the references to uniqueFields inside blockStreamMerger.mustWriteBlock(),
  which should improve readability and maintainability of the code.

- Remove logging of the event when blocks cannot be merged because they contain more than maxColumnsPerBlock,
  since the provided logging didn't provide the solution for the issue with too many columns.
  I couldn't figure out the proper solution, which could be helpful for end user,
  so decided to remove the logging until we find the solution.

This commit also contains the following additional changes:

- It truncates field names longer than 128 chars during logs ingestion.
  This should prevent from ingesting bogus field names.
  This also should prevent from too big columnsHeader blocks,
  which could negatively affect search query performance,
  since columnsHeader is read on every scan of the corresponding data block.

- It limits the maximum length of const column value to 256.
  Longer values are stored in an ordinary columns.
  This helps limiting the size of columnsHeader blocks
  and improving search query performance by avoiding
  reading too long const columns on every scan of the corresponding data block.

- It deduplicates columns with identical names during data ingestion
  and background merging. Previously it was possible to pass columns with duplicate names
  to block.mustInitFromRows(), and they were stored as is in the block.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4969
2023-10-02 21:06:49 +02:00

734 lines
18 KiB
Go

package logstorage
import (
"fmt"
"math"
"math/bits"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// valueType is the type of values stored in every column block.
type valueType byte
const (
// valueTypeUnknown is used for determining whether the value type is unknown.
valueTypeUnknown = valueType(0)
// default encoding for column blocks. Strings are stored as is.
valueTypeString = valueType(1)
// column blocks with small number of unique values are encoded as dict.
valueTypeDict = valueType(2)
// uint values up to 2^8-1 are encoded into valueTypeUint8.
// Every value occupies a single byte.
valueTypeUint8 = valueType(3)
// uint values up to 2^16-1 are encoded into valueTypeUint16.
// Every value occupies 2 bytes.
valueTypeUint16 = valueType(4)
// uint values up to 2^31-1 are encoded into valueTypeUint32.
// Every value occupies 4 bytes.
valueTypeUint32 = valueType(5)
// uint values up to 2^64-1 are encoded into valueTypeUint64.
// Every value occupies 8 bytes.
valueTypeUint64 = valueType(6)
// floating-point values are encoded into valueTypeFloat64.
valueTypeFloat64 = valueType(7)
// column blocks with ipv4 addresses are encoded as 4-byte strings.
valueTypeIPv4 = valueType(8)
// column blocks with ISO8601 timestamps are encoded into valueTypeTimestampISO8601.
// These timestamps are commonly used by Logstash.
valueTypeTimestampISO8601 = valueType(9)
)
type valuesEncoder struct {
// buf contains data for values.
buf []byte
// values contains encoded values.
values []string
}
func (ve *valuesEncoder) reset() {
ve.buf = ve.buf[:0]
vs := ve.values
for i := range vs {
vs[i] = ""
}
ve.values = vs[:0]
}
// encode encodes values to ve.values and returns the encoded value type with min/max encoded values.
func (ve *valuesEncoder) encode(values []string, dict *valuesDict) (valueType, uint64, uint64) {
ve.reset()
if len(values) == 0 {
return valueTypeString, 0, 0
}
var vt valueType
var minValue, maxValue uint64
// Try dict encoding at first, since it gives the highest speedup during querying.
// It also usually gives the best compression, since every value is encoded as a single byte.
ve.buf, ve.values, vt = tryDictEncoding(ve.buf[:0], ve.values[:0], values, dict)
if vt != valueTypeUnknown {
return vt, 0, 0
}
ve.buf, ve.values, vt, minValue, maxValue = tryUintEncoding(ve.buf[:0], ve.values[:0], values)
if vt != valueTypeUnknown {
return vt, minValue, maxValue
}
ve.buf, ve.values, vt, minValue, maxValue = tryFloat64Encoding(ve.buf[:0], ve.values[:0], values)
if vt != valueTypeUnknown {
return vt, minValue, maxValue
}
ve.buf, ve.values, vt, minValue, maxValue = tryIPv4Encoding(ve.buf[:0], ve.values[:0], values)
if vt != valueTypeUnknown {
return vt, minValue, maxValue
}
ve.buf, ve.values, vt, minValue, maxValue = tryTimestampISO8601Encoding(ve.buf[:0], ve.values[:0], values)
if vt != valueTypeUnknown {
return vt, minValue, maxValue
}
// Fall back to default encoding, e.g. leave values as is.
ve.values = append(ve.values[:0], values...)
return valueTypeString, 0, 0
}
func getValuesEncoder() *valuesEncoder {
v := valuesEncoderPool.Get()
if v == nil {
return &valuesEncoder{}
}
return v.(*valuesEncoder)
}
func putValuesEncoder(ve *valuesEncoder) {
ve.reset()
valuesEncoderPool.Put(ve)
}
var valuesEncoderPool sync.Pool
type valuesDecoder struct {
buf []byte
}
func (vd *valuesDecoder) reset() {
vd.buf = vd.buf[:0]
}
// decodeInplace decodes values encoded with the given vt and the given dict inplace.
//
// the decoded values remain valid until vd.reset() is called.
func (vd *valuesDecoder) decodeInplace(values []string, vt valueType, dict *valuesDict) error {
// do not reset vd.buf, since it may contain previously decoded data,
// which must be preserved until reset() call.
dstBuf := vd.buf
switch vt {
case valueTypeString:
// nothing to do - values are already decoded.
case valueTypeUint8:
for i, v := range values {
if len(v) != 1 {
return fmt.Errorf("unexpected value length for uint8; got %d; want 1", len(v))
}
n := uint64(v[0])
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeUint16:
for i, v := range values {
if len(v) != 2 {
return fmt.Errorf("unexpected value length for uint16; got %d; want 2", len(v))
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint16(b))
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeUint32:
for i, v := range values {
if len(v) != 4 {
return fmt.Errorf("unexpected value length for uint32; got %d; want 4", len(v))
}
b := bytesutil.ToUnsafeBytes(v)
n := uint64(encoding.UnmarshalUint32(b))
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeUint64:
for i, v := range values {
if len(v) != 8 {
return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v))
}
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
dstLen := len(dstBuf)
dstBuf = strconv.AppendUint(dstBuf, n, 10)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeDict:
dictValues := dict.values
for i, v := range values {
id := int(v[0])
if id >= len(dictValues) {
return fmt.Errorf("unexpected dictionary id: %d; it must be smaller than %d", id, len(dictValues))
}
values[i] = dictValues[id]
}
case valueTypeIPv4:
for i, v := range values {
if len(v) != 4 {
return fmt.Errorf("unexpected value length for ipv4; got %d; want 4", len(v))
}
dstLen := len(dstBuf)
dstBuf = toIPv4String(dstBuf, v)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeTimestampISO8601:
for i, v := range values {
if len(v) != 8 {
return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v))
}
dstLen := len(dstBuf)
dstBuf = toTimestampISO8601String(dstBuf, v)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
case valueTypeFloat64:
for i, v := range values {
if len(v) != 8 {
return fmt.Errorf("unexpected value length for uint64; got %d; want 8", len(v))
}
dstLen := len(dstBuf)
dstBuf = toFloat64String(dstBuf, v)
values[i] = bytesutil.ToUnsafeString(dstBuf[dstLen:])
}
default:
return fmt.Errorf("unknown valueType=%d", vt)
}
vd.buf = dstBuf
return nil
}
func toTimestampISO8601String(dst []byte, v string) []byte {
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
t := time.Unix(0, int64(n)).UTC()
dst = t.AppendFormat(dst, iso8601Timestamp)
return dst
}
func toIPv4String(dst []byte, v string) []byte {
dst = strconv.AppendUint(dst, uint64(v[0]), 10)
dst = append(dst, '.')
dst = strconv.AppendUint(dst, uint64(v[1]), 10)
dst = append(dst, '.')
dst = strconv.AppendUint(dst, uint64(v[2]), 10)
dst = append(dst, '.')
dst = strconv.AppendUint(dst, uint64(v[3]), 10)
return dst
}
func toFloat64String(dst []byte, v string) []byte {
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
f := math.Float64frombits(n)
dst = strconv.AppendFloat(dst, f, 'g', -1, 64)
return dst
}
func getValuesDecoder() *valuesDecoder {
v := valuesDecoderPool.Get()
if v == nil {
return &valuesDecoder{}
}
return v.(*valuesDecoder)
}
func putValuesDecoder(vd *valuesDecoder) {
vd.reset()
valuesDecoderPool.Put(vd)
}
var valuesDecoderPool sync.Pool
func tryTimestampISO8601Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u64s := encoding.GetUint64s(len(srcValues))
defer encoding.PutUint64s(u64s)
a := u64s.A
var minValue, maxValue uint64
for i, v := range srcValues {
n, ok := tryParseTimestampISO8601(v)
if !ok {
return dstBuf, dstValues, valueTypeUnknown, 0, 0
}
a[i] = n
if i == 0 || n < minValue {
minValue = n
}
if i == 0 || n > maxValue {
maxValue = n
}
}
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = encoding.MarshalUint64(dstBuf, n)
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeTimestampISO8601, minValue, maxValue
}
func tryParseTimestampISO8601(s string) (uint64, bool) {
// Do not parse timestamps with timezone, since they cannot be converted back
// to the same string representation in general case.
// This may break search.
if len(s) != len("2006-01-02T15:04:05.000Z") {
return 0, false
}
// Parse year
if s[len("YYYY")] != '-' {
return 0, false
}
yearStr := s[:len("YYYY")]
n, ok := tryParseUint64(yearStr)
if !ok || n > 3000 {
return 0, false
}
year := int(n)
s = s[len("YYYY")+1:]
// Parse month
if s[len("MM")] != '-' {
return 0, false
}
monthStr := s[:len("MM")]
n, ok = tryParseUint64(monthStr)
if !ok || n < 1 || n > 12 {
return 0, false
}
month := time.Month(n)
s = s[len("MM")+1:]
// Parse day
if s[len("DD")] != 'T' {
return 0, false
}
dayStr := s[:len("DD")]
n, ok = tryParseUint64(dayStr)
if !ok || n < 1 || n > 31 {
return 0, false
}
day := int(n)
s = s[len("DD")+1:]
// Parse hour
if s[len("HH")] != ':' {
return 0, false
}
hourStr := s[:len("HH")]
n, ok = tryParseUint64(hourStr)
if !ok || n > 23 {
return 0, false
}
hour := int(n)
s = s[len("HH")+1:]
// Parse minute
if s[len("MM")] != ':' {
return 0, false
}
minuteStr := s[:len("MM")]
n, ok = tryParseUint64(minuteStr)
if !ok || n > 59 {
return 0, false
}
minute := int(n)
s = s[len("MM")+1:]
// Parse second
if s[len("SS")] != '.' {
return 0, false
}
secondStr := s[:len("SS")]
n, ok = tryParseUint64(secondStr)
if !ok || n > 59 {
return 0, false
}
second := int(n)
s = s[len("SS")+1:]
// Parse millisecond
tzDelimiter := s[len("000")]
if tzDelimiter != 'Z' {
return 0, false
}
millisecondStr := s[:len("000")]
n, ok = tryParseUint64(millisecondStr)
if !ok || n > 999 {
return 0, false
}
millisecond := int(n)
s = s[len("000")+1:]
if len(s) != 0 {
return 0, false
}
t := time.Date(year, month, day, hour, minute, second, millisecond*1e6, time.UTC)
ts := t.UnixNano()
return uint64(ts), true
}
func tryParseUint64(s string) (uint64, bool) {
if len(s) == 0 || len(s) > 18 {
return 0, false
}
n := uint64(0)
for i := 0; i < len(s); i++ {
ch := s[i]
if ch < '0' || ch > '9' {
return 0, false
}
n *= 10
n += uint64(ch - '0')
}
return n, true
}
const iso8601Timestamp = "2006-01-02T15:04:05.000Z"
func tryIPv4Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u32s := encoding.GetUint32s(len(srcValues))
defer encoding.PutUint32s(u32s)
a := u32s.A
var minValue, maxValue uint32
for i, v := range srcValues {
n, ok := tryParseIPv4(v)
if !ok {
return dstBuf, dstValues, valueTypeUnknown, 0, 0
}
a[i] = n
if i == 0 || n < minValue {
minValue = n
}
if i == 0 || n > maxValue {
maxValue = n
}
}
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = encoding.MarshalUint32(dstBuf, n)
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeIPv4, uint64(minValue), uint64(maxValue)
}
func tryParseIPv4(s string) (uint32, bool) {
if len(s) < len("1.1.1.1") || len(s) > len("255.255.255.255") || strings.Count(s, ".") != 3 {
// Fast path - the entry isn't IPv4
return 0, false
}
var octets [4]byte
var v uint64
var ok bool
// Parse octet 1
n := strings.IndexByte(s, '.')
if n <= 0 || n > 3 {
return 0, false
}
v, ok = tryParseUint64(s[:n])
if !ok || v > 255 {
return 0, false
}
octets[0] = byte(v)
s = s[n+1:]
// Parse octet 2
n = strings.IndexByte(s, '.')
if n <= 0 || n > 3 {
return 0, false
}
v, ok = tryParseUint64(s[:n])
if !ok || v > 255 {
return 0, false
}
octets[1] = byte(v)
s = s[n+1:]
// Parse octet 3
n = strings.IndexByte(s, '.')
if n <= 0 || n > 3 {
return 0, false
}
v, ok = tryParseUint64(s[:n])
if !ok || v > 255 {
return 0, false
}
octets[2] = byte(v)
s = s[n+1:]
// Parse octet 4
v, ok = tryParseUint64(s)
if !ok || v > 255 {
return 0, false
}
octets[3] = byte(v)
ipv4 := encoding.UnmarshalUint32(octets[:])
return ipv4, true
}
func tryFloat64Encoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u64s := encoding.GetUint64s(len(srcValues))
defer encoding.PutUint64s(u64s)
a := u64s.A
var minValue, maxValue float64
for i, v := range srcValues {
f, ok := tryParseFloat64(v)
if !ok {
return dstBuf, dstValues, valueTypeUnknown, 0, 0
}
a[i] = math.Float64bits(f)
if i == 0 || f < minValue {
minValue = f
}
if i == 0 || f > maxValue {
maxValue = f
}
}
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = encoding.MarshalUint64(dstBuf, n)
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
minValueU64 := math.Float64bits(minValue)
maxValueU64 := math.Float64bits(maxValue)
return dstBuf, dstValues, valueTypeFloat64, minValueU64, maxValueU64
}
func tryParseFloat64(s string) (float64, bool) {
if len(s) == 0 || len(s) > 20 {
return 0, false
}
// Allow only decimal digits, minus and a dot.
// Do not allows scientific notation (for example 1.23E+05),
// since it cannot be converted back to the same string form.
minus := s[0] == '-'
if minus {
s = s[1:]
}
n := strings.IndexByte(s, '.')
if n < 0 {
// fast path - there are no dots
n, ok := tryParseUint64(s)
if !ok {
return 0, false
}
f := float64(n)
if minus {
f = -f
}
return f, true
}
if n == 0 || n == len(s)-1 {
// Do not allow dots at the beginning and at the end of s,
// since they cannot be converted back to the same string form.
return 0, false
}
sInt := s[:n]
sFrac := s[n+1:]
nInt, ok := tryParseUint64(sInt)
if !ok {
return 0, false
}
nFrac, ok := tryParseUint64(sFrac)
if !ok {
return 0, false
}
f := math.FMA(float64(nFrac), math.Pow10(-len(sFrac)), float64(nInt))
if minus {
f = -f
}
return f, true
}
func tryUintEncoding(dstBuf []byte, dstValues, srcValues []string) ([]byte, []string, valueType, uint64, uint64) {
u64s := encoding.GetUint64s(len(srcValues))
defer encoding.PutUint64s(u64s)
a := u64s.A
var minValue, maxValue uint64
for i, v := range srcValues {
n, ok := tryParseUint64(v)
if !ok {
return dstBuf, dstValues, valueTypeUnknown, 0, 0
}
a[i] = n
if i == 0 || n < minValue {
minValue = n
}
if i == 0 || n > maxValue {
maxValue = n
}
}
minBitSize := bits.Len64(maxValue)
if minBitSize <= 8 {
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = append(dstBuf, byte(n))
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeUint8, minValue, maxValue
}
if minBitSize <= 16 {
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = encoding.MarshalUint16(dstBuf, uint16(n))
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeUint16, minValue, maxValue
}
if minBitSize <= 32 {
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = encoding.MarshalUint32(dstBuf, uint32(n))
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeUint32, minValue, maxValue
}
for _, n := range a {
dstLen := len(dstBuf)
dstBuf = encoding.MarshalUint64(dstBuf, n)
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeUint64, minValue, maxValue
}
func tryDictEncoding(dstBuf []byte, dstValues, srcValues []string, dict *valuesDict) ([]byte, []string, valueType) {
dict.reset()
dstBufOrig := dstBuf
dstValuesOrig := dstValues
for _, v := range srcValues {
id, ok := dict.getOrAdd(v)
if !ok {
dict.reset()
return dstBufOrig, dstValuesOrig, valueTypeUnknown
}
dstLen := len(dstBuf)
dstBuf = append(dstBuf, id)
v := bytesutil.ToUnsafeString(dstBuf[dstLen:])
dstValues = append(dstValues, v)
}
return dstBuf, dstValues, valueTypeDict
}
type valuesDict struct {
values []string
}
func (vd *valuesDict) reset() {
vs := vd.values
for i := range vs {
vs[i] = ""
}
vd.values = vs[:0]
}
func (vd *valuesDict) copyFrom(src *valuesDict) {
vd.reset()
vd.values = append(vd.values[:0], src.values...)
}
func (vd *valuesDict) getOrAdd(k string) (byte, bool) {
if len(k) > maxDictSizeBytes {
return 0, false
}
vs := vd.values
dictSizeBytes := 0
for i, v := range vs {
if k == v {
return byte(i), true
}
dictSizeBytes += len(v)
}
if len(vs) >= maxDictLen || dictSizeBytes+len(k) > maxDictSizeBytes {
return 0, false
}
vs = append(vs, k)
vd.values = vs
return byte(len(vs) - 1), true
}
func (vd *valuesDict) marshal(dst []byte) []byte {
values := vd.values
if len(values) > maxDictLen {
logger.Panicf("BUG: valuesDict may contain max %d items; got %d items", maxDictLen, len(values))
}
dst = append(dst, byte(len(values)))
for _, v := range values {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(v))
}
return dst
}
func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) {
vd.reset()
srcOrig := src
if len(src) < 1 {
return srcOrig, fmt.Errorf("cannot umarshal dict len from 0 bytes; need at least 1 byte")
}
dictLen := int(src[0])
src = src[1:]
for i := 0; i < dictLen; i++ {
tail, data, err := encoding.UnmarshalBytes(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err)
}
src = tail
// Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod
v := string(data)
vd.values = append(vd.values, v)
}
return src, nil
}