VictoriaMetrics/lib/encoding/encoding.go

371 lines
10 KiB
Go

package encoding
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// minCompressibleBlockSize is the minimum block size in bytes for trying compression.
//
// There is no sense in compressing smaller blocks.
const minCompressibleBlockSize = 128
// MarshalType is the type used for the marshaling.
type MarshalType byte
const (
// MarshalTypeZSTDNearestDelta2 is used for marshaling counter
// timeseries.
MarshalTypeZSTDNearestDelta2 = MarshalType(1)
// MarshalTypeDeltaConst is used for marshaling constantly changed
// time series with constant delta.
MarshalTypeDeltaConst = MarshalType(2)
// MarshalTypeConst is used for marshaling time series containing only
// a single constant.
MarshalTypeConst = MarshalType(3)
// MarshalTypeZSTDNearestDelta is used for marshaling gauge timeseries.
MarshalTypeZSTDNearestDelta = MarshalType(4)
// MarshalTypeNearestDelta2 is used instead of MarshalTypeZSTDNearestDelta2
// if compression doesn't help.
MarshalTypeNearestDelta2 = MarshalType(5)
// MarshalTypeNearestDelta is used instead of MarshalTypeZSTDNearestDelta
// if compression doesn't help.
MarshalTypeNearestDelta = MarshalType(6)
)
// CheckMarshalType verifies whether the mt is valid.
func CheckMarshalType(mt MarshalType) error {
if mt < 0 || mt > 6 {
return fmt.Errorf("MarshalType should be in range [0..6]; got %d", mt)
}
return nil
}
// CheckPrecisionBits makes sure precisionBits is in the range [1..64].
func CheckPrecisionBits(precisionBits uint8) error {
if precisionBits < 1 || precisionBits > 64 {
return fmt.Errorf("precisionBits must be in the range [1...64]; got %d", precisionBits)
}
return nil
}
// MarshalTimestamps marshals timestamps, appends the marshaled result
// to dst and returns the dst.
//
// timestamps must contain non-decreasing values.
//
// precisionBits must be in the range [1...64], where 1 means 50% precision,
// while 64 means 100% precision, i.e. lossless encoding.
func MarshalTimestamps(dst []byte, timestamps []int64, precisionBits uint8) (result []byte, mt MarshalType, firstTimestamp int64) {
return marshalInt64Array(dst, timestamps, precisionBits)
}
// UnmarshalTimestamps unmarshals timestamps from src, appends them to dst
// and returns the resulting dst.
//
// firstTimestamp must be the timestamp returned from MarshalTimestamps.
func UnmarshalTimestamps(dst []int64, src []byte, mt MarshalType, firstTimestamp int64, itemsCount int) ([]int64, error) {
dst, err := unmarshalInt64Array(dst, src, mt, firstTimestamp, itemsCount)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal %d timestamps from len(src)=%d bytes: %s", itemsCount, len(src), err)
}
return dst, nil
}
// MarshalValues marshals values, appends the marshaled result to dst
// and returns the dst.
//
// precisionBits must be in the range [1...64], where 1 means 50% precision,
// while 64 means 100% precision, i.e. lossless encoding.
func MarshalValues(dst []byte, values []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {
return marshalInt64Array(dst, values, precisionBits)
}
// UnmarshalValues unmarshals values from src, appends them to dst and returns
// the resulting dst.
//
// firstValue must be the value returned from MarshalValues.
func UnmarshalValues(dst []int64, src []byte, mt MarshalType, firstValue int64, itemsCount int) ([]int64, error) {
dst, err := unmarshalInt64Array(dst, src, mt, firstValue, itemsCount)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal %d values from len(src)=%d bytes: %s", itemsCount, len(src), err)
}
return dst, nil
}
func marshalInt64Array(dst []byte, a []int64, precisionBits uint8) (result []byte, mt MarshalType, firstValue int64) {
if len(a) == 0 {
logger.Panicf("BUG: a must contain at least one item")
}
if isConst(a) {
firstValue = a[0]
return dst, MarshalTypeConst, firstValue
}
if isDeltaConst(a) {
firstValue = a[0]
dst = MarshalVarInt64(dst, a[1]-a[0])
return dst, MarshalTypeDeltaConst, firstValue
}
bb := bbPool.Get()
if isGauge(a) {
// Gauge values are better compressed with delta encoding.
mt = MarshalTypeZSTDNearestDelta
pb := precisionBits
if pb < 6 {
// Increase precision bits for gauges, since they suffer more
// from low precision bits comparing to counters.
pb += 2
}
bb.B, firstValue = marshalInt64NearestDelta(bb.B[:0], a, pb)
} else {
// Non-gauge values, i.e. counters are better compressed with delta2 encoding.
mt = MarshalTypeZSTDNearestDelta2
bb.B, firstValue = marshalInt64NearestDelta2(bb.B[:0], a, precisionBits)
}
// Try compressing the result.
dstOrig := dst
if len(bb.B) >= minCompressibleBlockSize {
compressLevel := getCompressLevel(len(a))
dst = CompressZSTDLevel(dst, bb.B, compressLevel)
}
if len(bb.B) < minCompressibleBlockSize || float64(len(dst)-len(dstOrig)) > 0.9*float64(len(bb.B)) {
// Ineffective compression. Store plain data.
switch mt {
case MarshalTypeZSTDNearestDelta2:
mt = MarshalTypeNearestDelta2
case MarshalTypeZSTDNearestDelta:
mt = MarshalTypeNearestDelta
default:
logger.Panicf("BUG: unexpected mt=%d", mt)
}
dst = append(dstOrig, bb.B...)
}
bbPool.Put(bb)
return dst, mt, firstValue
}
func unmarshalInt64Array(dst []int64, src []byte, mt MarshalType, firstValue int64, itemsCount int) ([]int64, error) {
// Extend dst capacity in order to eliminate memory allocations below.
dst = decimal.ExtendInt64sCapacity(dst, itemsCount)
var err error
switch mt {
case MarshalTypeZSTDNearestDelta:
bb := bbPool.Get()
bb.B, err = DecompressZSTD(bb.B[:0], src)
if err != nil {
return nil, fmt.Errorf("cannot decompress zstd data of size %d: %s; src_zstd=%X", len(src), err, src)
}
dst, err = unmarshalInt64NearestDelta(dst, bb.B, firstValue, itemsCount)
bbPool.Put(bb)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal nearest delta data after zstd decompression: %s; src_zstd=%X", err, src)
}
return dst, nil
case MarshalTypeZSTDNearestDelta2:
bb := bbPool.Get()
bb.B, err = DecompressZSTD(bb.B[:0], src)
if err != nil {
return nil, fmt.Errorf("cannot decompress zstd data of size %d: %s; src_zstd=%X", len(src), err, src)
}
dst, err = unmarshalInt64NearestDelta2(dst, bb.B, firstValue, itemsCount)
bbPool.Put(bb)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal nearest delta2 data after zstd decompression: %s; src_zstd=%X", err, src)
}
return dst, nil
case MarshalTypeNearestDelta:
dst, err = unmarshalInt64NearestDelta(dst, src, firstValue, itemsCount)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal nearest delta data: %s", err)
}
return dst, nil
case MarshalTypeNearestDelta2:
dst, err = unmarshalInt64NearestDelta2(dst, src, firstValue, itemsCount)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal nearest delta2 data: %s", err)
}
return dst, nil
case MarshalTypeConst:
if len(src) > 0 {
return nil, fmt.Errorf("unexpected data left in const encoding: %d bytes", len(src))
}
if firstValue == 0 {
dst = fastnum.AppendInt64Zeros(dst, itemsCount)
return dst, nil
}
if firstValue == 1 {
dst = fastnum.AppendInt64Ones(dst, itemsCount)
return dst, nil
}
for itemsCount > 0 {
dst = append(dst, firstValue)
itemsCount--
}
return dst, nil
case MarshalTypeDeltaConst:
v := firstValue
tail, d, err := UnmarshalVarInt64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal delta value for delta const: %s", err)
}
if len(tail) > 0 {
return nil, fmt.Errorf("unexpected trailing data after delta const (d=%d): %d bytes", d, len(tail))
}
for itemsCount > 0 {
dst = append(dst, v)
itemsCount--
v += d
}
return dst, nil
default:
return nil, fmt.Errorf("unknown MarshalType=%d", mt)
}
}
var bbPool bytesutil.ByteBufferPool
// EnsureNonDecreasingSequence makes sure the first item in a is vMin, the last
// item in a is vMax and all the items in a are non-decreasing.
//
// If this isn't the case the a is fixed accordingly.
func EnsureNonDecreasingSequence(a []int64, vMin, vMax int64) {
if vMax < vMin {
logger.Panicf("BUG: vMax cannot be smaller than vMin; got %d vs %d", vMax, vMin)
}
if len(a) == 0 {
return
}
if a[0] != vMin {
a[0] = vMin
}
vPrev := a[0]
aa := a[1:]
for i, v := range aa {
if v < vPrev {
aa[i] = vPrev
v = vPrev
}
vPrev = v
}
i := len(a) - 1
if a[i] != vMax {
a[i] = vMax
i--
for i >= 0 && a[i] > vMax {
a[i] = vMax
i--
}
}
}
// isConst returns true if a contains only equal values.
func isConst(a []int64) bool {
if len(a) == 0 {
return false
}
if fastnum.IsInt64Zeros(a) {
// Fast path for array containing only zeros.
return true
}
if fastnum.IsInt64Ones(a) {
// Fast path for array containing only ones.
return true
}
v1 := a[0]
for _, v := range a {
if v != v1 {
return false
}
}
return true
}
// isDeltaConst returns true if a contains counter with constant delta.
func isDeltaConst(a []int64) bool {
if len(a) < 2 {
return false
}
d1 := a[1] - a[0]
prev := a[1]
for _, next := range a[2:] {
if next-prev != d1 {
return false
}
prev = next
}
return true
}
// isGauge returns true if a contains gauge values,
// i.e. arbitrary changing values.
//
// It is OK if a few gauges aren't detected (i.e. detected as counters),
// since misdetected counters as gauges leads to worser compression ratio.
func isGauge(a []int64) bool {
// Check all the items in a, since a part of items may lead
// to incorrect gauge detection.
if len(a) < 2 {
return false
}
resets := 0
vPrev := a[0]
if vPrev < 0 {
// Counter values cannot be negative.
return true
}
for _, v := range a[1:] {
if v < vPrev {
if v < 0 {
// Counter values cannot be negative.
return true
}
if v > (vPrev >> 3) {
// Decreasing sequence detected.
// This is a gauge.
return true
}
// Possible counter reset.
resets++
}
vPrev = v
}
if resets <= 2 {
// Counter with a few resets.
return false
}
// Let it be a gauge if resets exceeds len(a)/8,
// otherwise assume counter.
return resets > (len(a) >> 3)
}
func getCompressLevel(itemsCount int) int {
if itemsCount <= 1<<6 {
return 1
}
if itemsCount <= 1<<8 {
return 2
}
if itemsCount <= 1<<10 {
return 3
}
if itemsCount <= 1<<12 {
return 4
}
return 5
}