mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/promutils: optimize LabelsCompressor.Decompress by using a specialized labelsMap struct instead of sync.Map
The labelsMap struct employs the fact that label indexes are condensed around 0, so it stores the referred labels in a slice instead of map and uses slice index as label key. This allows increasing the LabelsCompressor.Decompress performance by up to 3x. This also reduces the latency of data flush in stream aggregation.
This commit is contained in:
parent
492c6c3ff5
commit
b958135677
1 changed files with 108 additions and 12 deletions
|
@ -14,7 +14,7 @@ import (
|
|||
// LabelsCompressor compresses []prompbmarshal.Label into short binary strings
|
||||
type LabelsCompressor struct {
|
||||
labelToIdx sync.Map
|
||||
idxToLabel sync.Map
|
||||
idxToLabel labelsMap
|
||||
|
||||
nextIdx atomic.Uint64
|
||||
|
||||
|
@ -32,6 +32,8 @@ func (lc *LabelsCompressor) ItemsCount() uint64 {
|
|||
}
|
||||
|
||||
// Compress compresses labels, appends the compressed labels to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Compress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) []byte {
|
||||
if len(labels) == 0 {
|
||||
// Fast path
|
||||
|
@ -51,25 +53,25 @@ func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label)
|
|||
return
|
||||
}
|
||||
_ = dst[len(labels)-1]
|
||||
for i := range labels {
|
||||
label := &labels[i]
|
||||
v, ok := lc.labelToIdx.Load(*label)
|
||||
for i, label := range labels {
|
||||
v, ok := lc.labelToIdx.Load(label)
|
||||
if !ok {
|
||||
v = lc.nextIdx.Add(1)
|
||||
idx := lc.nextIdx.Add(1)
|
||||
v = idx
|
||||
labelCopy := cloneLabel(label)
|
||||
lc.idxToLabel.Store(v, labelCopy)
|
||||
lc.labelToIdx.Store(*labelCopy, v)
|
||||
lc.idxToLabel.Store(idx, labelCopy)
|
||||
lc.labelToIdx.Store(labelCopy, v)
|
||||
|
||||
// Update lc.totalSizeBytes
|
||||
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
|
||||
entrySizeBytes := labelSizeBytes + uint64(unsafe.Sizeof(label)+unsafe.Sizeof(*label)+2*unsafe.Sizeof(v))
|
||||
entrySizeBytes := labelSizeBytes + uint64(2*(unsafe.Sizeof(label)+unsafe.Sizeof(&label))+unsafe.Sizeof(v))
|
||||
lc.totalSizeBytes.Add(entrySizeBytes)
|
||||
}
|
||||
dst[i] = v.(uint64)
|
||||
}
|
||||
}
|
||||
|
||||
func cloneLabel(label *prompbmarshal.Label) *prompbmarshal.Label {
|
||||
func cloneLabel(label prompbmarshal.Label) prompbmarshal.Label {
|
||||
// pre-allocate memory for label name and value
|
||||
n := len(label.Name) + len(label.Value)
|
||||
buf := make([]byte, 0, n)
|
||||
|
@ -79,13 +81,15 @@ func cloneLabel(label *prompbmarshal.Label) *prompbmarshal.Label {
|
|||
|
||||
buf = append(buf, label.Value...)
|
||||
labelValue := bytesutil.ToUnsafeString(buf[len(labelName):])
|
||||
return &prompbmarshal.Label{
|
||||
return prompbmarshal.Label{
|
||||
Name: labelName,
|
||||
Value: labelValue,
|
||||
}
|
||||
}
|
||||
|
||||
// Decompress decompresses src into []prompbmarshal.Label, appends it to dst and returns the result.
|
||||
//
|
||||
// It is safe calling Decompress from concurrent goroutines.
|
||||
func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []prompbmarshal.Label {
|
||||
tail, labelsLen, err := encoding.UnmarshalVarUint64(src)
|
||||
if err != nil {
|
||||
|
@ -114,12 +118,104 @@ func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []
|
|||
|
||||
func (lc *LabelsCompressor) decompress(dst []prompbmarshal.Label, src []uint64) []prompbmarshal.Label {
|
||||
for _, idx := range src {
|
||||
v, ok := lc.idxToLabel.Load(idx)
|
||||
label, ok := lc.idxToLabel.Load(idx)
|
||||
if !ok {
|
||||
logger.Panicf("BUG: missing label for idx=%d", idx)
|
||||
}
|
||||
label := *(v.(*prompbmarshal.Label))
|
||||
dst = append(dst, label)
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
// labelsMap maps uint64 key to prompbmarshal.Label
|
||||
//
|
||||
// uint64 keys must be packed close to 0. Otherwise the labelsMap structure will consume too much memory.
|
||||
type labelsMap struct {
|
||||
readOnly atomic.Pointer[[]*prompbmarshal.Label]
|
||||
|
||||
mutableLock sync.Mutex
|
||||
mutable map[uint64]*prompbmarshal.Label
|
||||
misses uint64
|
||||
}
|
||||
|
||||
// Store stores label under the given idx.
|
||||
//
|
||||
// It is safe calling Store from concurrent goroutines.
|
||||
func (lm *labelsMap) Store(idx uint64, label prompbmarshal.Label) {
|
||||
lm.mutableLock.Lock()
|
||||
if lm.mutable == nil {
|
||||
lm.mutable = make(map[uint64]*prompbmarshal.Label)
|
||||
}
|
||||
lm.mutable[idx] = &label
|
||||
lm.mutableLock.Unlock()
|
||||
}
|
||||
|
||||
// Load returns the label for the given idx.
|
||||
//
|
||||
// Load returns false if lm doesn't contain label for the given idx.
|
||||
//
|
||||
// It is safe calling Load from concurrent goroutines.
|
||||
//
|
||||
// The performance of Load() scales linearly with CPU cores.
|
||||
func (lm *labelsMap) Load(idx uint64) (prompbmarshal.Label, bool) {
|
||||
if pReadOnly := lm.readOnly.Load(); pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
// Fast path - the label for the given idx has been found in lm.readOnly.
|
||||
return *pLabel, true
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path - search in lm.mutable.
|
||||
return lm.loadSlow(idx)
|
||||
}
|
||||
|
||||
func (lm *labelsMap) loadSlow(idx uint64) (prompbmarshal.Label, bool) {
|
||||
lm.mutableLock.Lock()
|
||||
|
||||
// Try loading label from readOnly, since it could be updated while acquiring mutableLock.
|
||||
pReadOnly := lm.readOnly.Load()
|
||||
if pReadOnly != nil && idx < uint64(len(*pReadOnly)) {
|
||||
if pLabel := (*pReadOnly)[idx]; pLabel != nil {
|
||||
lm.mutableLock.Unlock()
|
||||
return *pLabel, true
|
||||
}
|
||||
}
|
||||
|
||||
// The label for the idx wasn't found in readOnly. Search it in mutable.
|
||||
lm.misses++
|
||||
pLabel := lm.mutable[idx]
|
||||
if pReadOnly == nil || lm.misses > uint64(len(*pReadOnly)) {
|
||||
lm.moveMutableToReadOnlyLocked(pReadOnly)
|
||||
lm.misses = 0
|
||||
}
|
||||
lm.mutableLock.Unlock()
|
||||
|
||||
if pLabel == nil {
|
||||
return prompbmarshal.Label{}, false
|
||||
}
|
||||
return *pLabel, true
|
||||
}
|
||||
|
||||
func (lm *labelsMap) moveMutableToReadOnlyLocked(pReadOnly *[]*prompbmarshal.Label) {
|
||||
if len(lm.mutable) == 0 {
|
||||
// Nothing to move
|
||||
return
|
||||
}
|
||||
|
||||
var labels []*prompbmarshal.Label
|
||||
if pReadOnly != nil {
|
||||
labels = append(labels, *pReadOnly...)
|
||||
}
|
||||
for idx, pLabel := range lm.mutable {
|
||||
if idx < uint64(len(labels)) {
|
||||
labels[idx] = pLabel
|
||||
} else {
|
||||
for idx > uint64(len(labels)) {
|
||||
labels = append(labels, nil)
|
||||
}
|
||||
labels = append(labels, pLabel)
|
||||
}
|
||||
}
|
||||
clear(lm.mutable)
|
||||
lm.readOnly.Store(&labels)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue