mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
fbfd7415da
It allows to modify exist series values. User must write modified series into vminsert API /insert/0/prometheus/api/v1/update/series vminsert will generate id and add it to the series as __generation_id label. Modified series merged at vmselect side. Only last series modify request at given time range will be applied. Modification request could be exported with the following API request: `curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'reduce_mem_usage=true' -d 'match[]={__generation_id!=""}'` https://github.com/VictoriaMetrics/VictoriaMetrics/issues/844 adds guide allow single datapoint modification vmselectapi: prevent MetricBlockRef corruption Modofying of MetricName byte slice may result into MetricBlockRef corruption, since `ctx.mb.MetricName` is a pointer to `MetricBlockRef.MetricName`. Signed-off-by: hagen1778 <roman@victoriametrics.com> Revert "vmselectapi: prevent MetricBlockRef corruption" This reverts commit cf36bfa1895885fcc7dc2673248ee56c78180ea0. app/vmstorage/servers: properly copy MetricName into MetricBlock inside blockIterator.NextBlock This should fix the issue atcf36bfa189
(cherry picked from commit916f1ab86c
) app/vmselect: correctly update single datapoint at merge app/vmselect: adds mutex for series update map previously it was sync api, but function signature was changed for performance optimizations
949 lines
24 KiB
Go
949 lines
24 KiB
Go
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
|
)
|
|
|
|
const (
|
|
escapeChar = 0
|
|
tagSeparatorChar = 1
|
|
kvSeparatorChar = 2
|
|
)
|
|
|
|
// Tag represents a (key, value) tag for metric.
|
|
type Tag struct {
|
|
Key []byte
|
|
Value []byte
|
|
}
|
|
|
|
// Reset resets the tag.
|
|
func (tag *Tag) Reset() {
|
|
tag.Key = tag.Key[:0]
|
|
tag.Value = tag.Value[:0]
|
|
}
|
|
|
|
// Equal returns true if tag equals t
|
|
func (tag *Tag) Equal(t *Tag) bool {
|
|
return string(tag.Key) == string(t.Key) && string(tag.Value) == string(t.Value)
|
|
}
|
|
|
|
// Marshal appends marshaled tag to dst and returns the result.
|
|
func (tag *Tag) Marshal(dst []byte) []byte {
|
|
dst = marshalTagValue(dst, tag.Key)
|
|
dst = marshalTagValue(dst, tag.Value)
|
|
return dst
|
|
}
|
|
|
|
// Unmarshal unmarshals tag from src and returns the remaining data from src.
|
|
func (tag *Tag) Unmarshal(src []byte) ([]byte, error) {
|
|
var err error
|
|
src, tag.Key, err = unmarshalTagValue(tag.Key[:0], src)
|
|
if err != nil {
|
|
return src, fmt.Errorf("cannot unmarshal key: %w", err)
|
|
}
|
|
src, tag.Value, err = unmarshalTagValue(tag.Value[:0], src)
|
|
if err != nil {
|
|
return src, fmt.Errorf("cannot unmarshal value: %w", err)
|
|
}
|
|
return src, nil
|
|
}
|
|
|
|
func (tag *Tag) copyFrom(src *Tag) {
|
|
tag.Key = append(tag.Key[:0], src.Key...)
|
|
tag.Value = append(tag.Value[:0], src.Value...)
|
|
}
|
|
|
|
func marshalTagValueNoTrailingTagSeparator(dst []byte, src string) []byte {
|
|
dst = marshalTagValue(dst, bytesutil.ToUnsafeBytes(src))
|
|
// Remove trailing tagSeparatorChar
|
|
return dst[:len(dst)-1]
|
|
}
|
|
|
|
func marshalTagValue(dst, src []byte) []byte {
|
|
n1 := bytes.IndexByte(src, escapeChar)
|
|
n2 := bytes.IndexByte(src, tagSeparatorChar)
|
|
n3 := bytes.IndexByte(src, kvSeparatorChar)
|
|
if n1 < 0 && n2 < 0 && n3 < 0 {
|
|
// Fast path.
|
|
dst = append(dst, src...)
|
|
dst = append(dst, tagSeparatorChar)
|
|
return dst
|
|
}
|
|
|
|
// Slow path.
|
|
for _, ch := range src {
|
|
switch ch {
|
|
case escapeChar:
|
|
dst = append(dst, escapeChar, '0')
|
|
case tagSeparatorChar:
|
|
dst = append(dst, escapeChar, '1')
|
|
case kvSeparatorChar:
|
|
dst = append(dst, escapeChar, '2')
|
|
default:
|
|
dst = append(dst, ch)
|
|
}
|
|
}
|
|
|
|
dst = append(dst, tagSeparatorChar)
|
|
return dst
|
|
}
|
|
|
|
func unmarshalTagValue(dst, src []byte) ([]byte, []byte, error) {
|
|
n := bytes.IndexByte(src, tagSeparatorChar)
|
|
if n < 0 {
|
|
return src, dst, fmt.Errorf("cannot find the end of tag value")
|
|
}
|
|
b := src[:n]
|
|
src = src[n+1:]
|
|
for {
|
|
n := bytes.IndexByte(b, escapeChar)
|
|
if n < 0 {
|
|
dst = append(dst, b...)
|
|
return src, dst, nil
|
|
}
|
|
dst = append(dst, b[:n]...)
|
|
b = b[n+1:]
|
|
if len(b) == 0 {
|
|
return src, dst, fmt.Errorf("missing escaped char")
|
|
}
|
|
switch b[0] {
|
|
case '0':
|
|
dst = append(dst, escapeChar)
|
|
case '1':
|
|
dst = append(dst, tagSeparatorChar)
|
|
case '2':
|
|
dst = append(dst, kvSeparatorChar)
|
|
default:
|
|
return src, dst, fmt.Errorf("unsupported escaped char: %c", b[0])
|
|
}
|
|
b = b[1:]
|
|
}
|
|
}
|
|
|
|
// MetricName reperesents a metric name.
|
|
type MetricName struct {
|
|
AccountID uint32
|
|
ProjectID uint32
|
|
|
|
MetricGroup []byte
|
|
|
|
// Tags are optional. They must be sorted by tag Key for canonical view.
|
|
// Use sortTags method.
|
|
Tags []Tag
|
|
}
|
|
|
|
// GetMetricName returns a MetricName from pool.
|
|
func GetMetricName() *MetricName {
|
|
v := mnPool.Get()
|
|
if v == nil {
|
|
return &MetricName{}
|
|
}
|
|
return v.(*MetricName)
|
|
}
|
|
|
|
// PutMetricName returns mn to the pool.
|
|
func PutMetricName(mn *MetricName) {
|
|
mn.Reset()
|
|
mnPool.Put(mn)
|
|
}
|
|
|
|
var mnPool sync.Pool
|
|
|
|
// Reset resets the mn.
|
|
func (mn *MetricName) Reset() {
|
|
mn.AccountID = 0
|
|
mn.ProjectID = 0
|
|
mn.MetricGroup = mn.MetricGroup[:0]
|
|
mn.Tags = mn.Tags[:0]
|
|
}
|
|
|
|
// MoveFrom moves src to mn.
|
|
//
|
|
// The src is reset after the call.
|
|
func (mn *MetricName) MoveFrom(src *MetricName) {
|
|
*mn = *src
|
|
*src = MetricName{}
|
|
}
|
|
|
|
// CopyFrom copies src to mn.
|
|
func (mn *MetricName) CopyFrom(src *MetricName) {
|
|
mn.AccountID = src.AccountID
|
|
mn.ProjectID = src.ProjectID
|
|
if cap(mn.MetricGroup) > 0 {
|
|
mn.MetricGroup = append(mn.MetricGroup[:0], src.MetricGroup...)
|
|
mn.Tags = copyTags(mn.Tags[:0], src.Tags)
|
|
return
|
|
}
|
|
|
|
// Pre-allocate a single byte slice for MetricGroup + all the tags.
|
|
// This reduces the number of memory allocations for zero mn.
|
|
size := len(src.MetricGroup)
|
|
for i := range src.Tags {
|
|
tag := &src.Tags[i]
|
|
size += len(tag.Key)
|
|
size += len(tag.Value)
|
|
}
|
|
b := make([]byte, 0, size)
|
|
|
|
b = append(b, src.MetricGroup...)
|
|
mn.MetricGroup = b[:len(b):len(b)]
|
|
|
|
mn.Tags = make([]Tag, len(src.Tags))
|
|
for i := range src.Tags {
|
|
st := &src.Tags[i]
|
|
dt := &mn.Tags[i]
|
|
b = append(b, st.Key...)
|
|
dt.Key = b[len(b)-len(st.Key) : len(b) : len(b)]
|
|
b = append(b, st.Value...)
|
|
dt.Value = b[len(b)-len(st.Value) : len(b) : len(b)]
|
|
}
|
|
}
|
|
|
|
// AddTag adds new tag to mn with the given key and value.
|
|
func (mn *MetricName) AddTag(key, value string) {
|
|
if key == string(metricGroupTagKey) {
|
|
mn.MetricGroup = append(mn.MetricGroup, value...)
|
|
return
|
|
}
|
|
tag := mn.addNextTag()
|
|
tag.Key = append(tag.Key[:0], key...)
|
|
tag.Value = append(tag.Value[:0], value...)
|
|
}
|
|
|
|
// AddTagBytes adds new tag to mn with the given key and value.
|
|
func (mn *MetricName) AddTagBytes(key, value []byte) {
|
|
if string(key) == string(metricGroupTagKey) {
|
|
mn.MetricGroup = append(mn.MetricGroup, value...)
|
|
return
|
|
}
|
|
tag := mn.addNextTag()
|
|
tag.Key = append(tag.Key[:0], key...)
|
|
tag.Value = append(tag.Value[:0], value...)
|
|
}
|
|
|
|
func (mn *MetricName) addNextTag() *Tag {
|
|
if len(mn.Tags) < cap(mn.Tags) {
|
|
mn.Tags = mn.Tags[:len(mn.Tags)+1]
|
|
} else {
|
|
mn.Tags = append(mn.Tags, Tag{})
|
|
}
|
|
return &mn.Tags[len(mn.Tags)-1]
|
|
}
|
|
|
|
// ResetMetricGroup resets mn.MetricGroup
|
|
func (mn *MetricName) ResetMetricGroup() {
|
|
mn.MetricGroup = mn.MetricGroup[:0]
|
|
}
|
|
|
|
var metricGroupTagKey = []byte("__name__")
|
|
|
|
// RemoveTagsOn removes all the tags not included to onTags.
|
|
func (mn *MetricName) RemoveTagsOn(onTags []string) {
|
|
if !hasTag(onTags, metricGroupTagKey) {
|
|
mn.ResetMetricGroup()
|
|
}
|
|
tags := mn.Tags
|
|
mn.Tags = mn.Tags[:0]
|
|
if len(onTags) == 0 {
|
|
return
|
|
}
|
|
for i := range tags {
|
|
tag := &tags[i]
|
|
if hasTag(onTags, tag.Key) {
|
|
mn.AddTagBytes(tag.Key, tag.Value)
|
|
}
|
|
}
|
|
}
|
|
|
|
// RemoveTag removes a tag with the given tagKey
|
|
func (mn *MetricName) RemoveTag(tagKey string) {
|
|
if tagKey == "__name__" {
|
|
mn.ResetMetricGroup()
|
|
return
|
|
}
|
|
tags := mn.Tags
|
|
mn.Tags = mn.Tags[:0]
|
|
for i := range tags {
|
|
tag := &tags[i]
|
|
if string(tag.Key) != tagKey {
|
|
mn.AddTagBytes(tag.Key, tag.Value)
|
|
}
|
|
}
|
|
}
|
|
|
|
// RemoveTagWithResult removes a tag with the given tagKey and returns removed Tag
|
|
func (mn *MetricName) RemoveTagWithResult(tagKey string) *Tag {
|
|
if tagKey == "__name__" {
|
|
mn.ResetMetricGroup()
|
|
return nil
|
|
}
|
|
tags := mn.Tags
|
|
mn.Tags = mn.Tags[:0]
|
|
var foundTag *Tag
|
|
for i := range tags {
|
|
tag := &tags[i]
|
|
if string(tag.Key) != tagKey {
|
|
mn.AddTagBytes(tag.Key, tag.Value)
|
|
continue
|
|
}
|
|
var t Tag
|
|
t.copyFrom(tag)
|
|
foundTag = &t
|
|
}
|
|
return foundTag
|
|
}
|
|
|
|
// RemoveTagsIgnoring removes all the tags included in ignoringTags.
|
|
func (mn *MetricName) RemoveTagsIgnoring(ignoringTags []string) {
|
|
if len(ignoringTags) == 0 {
|
|
return
|
|
}
|
|
if hasTag(ignoringTags, metricGroupTagKey) {
|
|
mn.ResetMetricGroup()
|
|
}
|
|
tags := mn.Tags
|
|
mn.Tags = mn.Tags[:0]
|
|
for i := range tags {
|
|
tag := &tags[i]
|
|
if !hasTag(ignoringTags, tag.Key) {
|
|
mn.AddTagBytes(tag.Key, tag.Value)
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetTagValue returns tag value for the given tagKey.
|
|
func (mn *MetricName) GetTagValue(tagKey string) []byte {
|
|
if tagKey == "__name__" {
|
|
return mn.MetricGroup
|
|
}
|
|
tags := mn.Tags
|
|
for i := range tags {
|
|
tag := &tags[i]
|
|
if string(tag.Key) == tagKey {
|
|
return tag.Value
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetTags sets tags from src with keys matching addTags.
|
|
//
|
|
// It adds prefix to copied label names.
|
|
// skipTags contains a list of tags, which must be skipped.
|
|
func (mn *MetricName) SetTags(addTags []string, prefix string, skipTags []string, src *MetricName) {
|
|
if len(addTags) == 1 && addTags[0] == "*" {
|
|
// Special case for copying all the tags except of skipTags from src to mn.
|
|
mn.setAllTags(prefix, skipTags, src)
|
|
return
|
|
}
|
|
bb := bbPool.Get()
|
|
for _, tagName := range addTags {
|
|
if containsString(skipTags, tagName) {
|
|
continue
|
|
}
|
|
if tagName == string(metricGroupTagKey) {
|
|
mn.MetricGroup = append(mn.MetricGroup[:0], src.MetricGroup...)
|
|
continue
|
|
}
|
|
var srcTag *Tag
|
|
for i := range src.Tags {
|
|
t := &src.Tags[i]
|
|
if string(t.Key) == tagName {
|
|
srcTag = t
|
|
break
|
|
}
|
|
}
|
|
if srcTag == nil {
|
|
mn.RemoveTag(tagName)
|
|
continue
|
|
}
|
|
bb.B = append(bb.B[:0], prefix...)
|
|
bb.B = append(bb.B, tagName...)
|
|
mn.SetTagBytes(bb.B, srcTag.Value)
|
|
}
|
|
bbPool.Put(bb)
|
|
}
|
|
|
|
var bbPool bytesutil.ByteBufferPool
|
|
|
|
// SetTagBytes sets tag with the given key to the given value.
|
|
func (mn *MetricName) SetTagBytes(key, value []byte) {
|
|
for i := range mn.Tags {
|
|
t := &mn.Tags[i]
|
|
if string(t.Key) == string(key) {
|
|
t.Value = append(t.Value[:0], value...)
|
|
return
|
|
}
|
|
}
|
|
mn.AddTagBytes(key, value)
|
|
}
|
|
|
|
func (mn *MetricName) setAllTags(prefix string, skipTags []string, src *MetricName) {
|
|
bb := bbPool.Get()
|
|
for _, tag := range src.Tags {
|
|
if containsString(skipTags, bytesutil.ToUnsafeString(tag.Key)) {
|
|
continue
|
|
}
|
|
bb.B = append(bb.B[:0], prefix...)
|
|
bb.B = append(bb.B, tag.Key...)
|
|
mn.SetTagBytes(bb.B, tag.Value)
|
|
}
|
|
bbPool.Put(bb)
|
|
}
|
|
|
|
func containsString(a []string, s string) bool {
|
|
for _, x := range a {
|
|
if x == s {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func hasTag(tags []string, key []byte) bool {
|
|
for _, t := range tags {
|
|
if t == string(key) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// String returns user-readable representation of the metric name.
|
|
func (mn *MetricName) String() string {
|
|
var mnCopy MetricName
|
|
mnCopy.CopyFrom(mn)
|
|
mnCopy.sortTags()
|
|
var tags []string
|
|
for i := range mnCopy.Tags {
|
|
t := &mnCopy.Tags[i]
|
|
tags = append(tags, fmt.Sprintf("%s=%q", t.Key, t.Value))
|
|
}
|
|
tagsStr := strings.Join(tags, ",")
|
|
return fmt.Sprintf("AccountID=%d, ProjectID=%d, %s{%s}", mnCopy.AccountID, mnCopy.ProjectID, mnCopy.MetricGroup, tagsStr)
|
|
}
|
|
|
|
// Marshal appends marshaled mn to dst and returns the result.
|
|
//
|
|
// mn.sortTags must be called before calling this function
|
|
// in order to sort and de-duplcate tags.
|
|
func (mn *MetricName) Marshal(dst []byte) []byte {
|
|
// Calculate the required size and pre-allocate space in dst
|
|
dstLen := len(dst)
|
|
requiredSize := 8 + len(mn.MetricGroup) + 1
|
|
for i := range mn.Tags {
|
|
tag := &mn.Tags[i]
|
|
requiredSize += len(tag.Key) + len(tag.Value) + 2
|
|
}
|
|
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, requiredSize)[:dstLen]
|
|
|
|
dst = encoding.MarshalUint32(dst, mn.AccountID)
|
|
dst = encoding.MarshalUint32(dst, mn.ProjectID)
|
|
dst = marshalTagValue(dst, mn.MetricGroup)
|
|
|
|
// Marshal tags.
|
|
tags := mn.Tags
|
|
for i := range tags {
|
|
t := &tags[i]
|
|
dst = t.Marshal(dst)
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// UnmarshalString unmarshals mn from s
|
|
func (mn *MetricName) UnmarshalString(s string) error {
|
|
b := bytesutil.ToUnsafeBytes(s)
|
|
err := mn.Unmarshal(b)
|
|
runtime.KeepAlive(s)
|
|
return err
|
|
}
|
|
|
|
// Unmarshal unmarshals mn from src.
|
|
func (mn *MetricName) Unmarshal(src []byte) error {
|
|
if len(src) < 8 {
|
|
return fmt.Errorf("too short src: %d bytes; must be at least 8 bytes", len(src))
|
|
}
|
|
mn.AccountID = encoding.UnmarshalUint32(src)
|
|
mn.ProjectID = encoding.UnmarshalUint32(src[4:])
|
|
src = src[8:]
|
|
|
|
// Unmarshal MetricGroup.
|
|
var err error
|
|
src, mn.MetricGroup, err = unmarshalTagValue(mn.MetricGroup[:0], src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal MetricGroup: %w", err)
|
|
}
|
|
|
|
mn.Tags = mn.Tags[:0]
|
|
for len(src) > 0 {
|
|
tag := mn.addNextTag()
|
|
var err error
|
|
src, err = tag.Unmarshal(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal tag: %w", err)
|
|
}
|
|
}
|
|
|
|
// There is no need in verifying for identical tag keys,
|
|
// since they must be handled by MetricName.sortTags before calling MetricName.Marshal.
|
|
|
|
return nil
|
|
}
|
|
|
|
// MarshalNoAccountIDProjectID appends marshaled mn without AccountID and ProjectID
|
|
// to dst and returns the result.
|
|
//
|
|
// The result must be unmarshaled with UnmarshalNoAccountIDProjectID.
|
|
//
|
|
// It is expected that mn.Tags are already sorted and de-duplicated with mn.sortTags.
|
|
func (mn *MetricName) MarshalNoAccountIDProjectID(dst []byte) []byte {
|
|
// Calculate the required size and pre-allocate space in dst
|
|
dstLen := len(dst)
|
|
requiredSize := len(mn.MetricGroup) + 1
|
|
for i := range mn.Tags {
|
|
tag := &mn.Tags[i]
|
|
requiredSize += len(tag.Key) + len(tag.Value) + 2
|
|
}
|
|
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, requiredSize)[:dstLen]
|
|
|
|
dst = marshalTagValue(dst, mn.MetricGroup)
|
|
tags := mn.Tags
|
|
for i := range tags {
|
|
t := &tags[i]
|
|
dst = t.Marshal(dst)
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// UnmarshalNoAccountIDProjectID unmarshals mn, which has been marshaled with MarshalNoAccountIDProjectID
|
|
func (mn *MetricName) UnmarshalNoAccountIDProjectID(src []byte) error {
|
|
mn.AccountID = 0
|
|
mn.ProjectID = 0
|
|
|
|
// Unmarshal MetricGroup.
|
|
var err error
|
|
src, mn.MetricGroup, err = unmarshalTagValue(mn.MetricGroup[:0], src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal MetricGroup: %w", err)
|
|
}
|
|
|
|
mn.Tags = mn.Tags[:0]
|
|
for len(src) > 0 {
|
|
tag := mn.addNextTag()
|
|
var err error
|
|
src, err = tag.Unmarshal(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot unmarshal tag: %w", err)
|
|
}
|
|
}
|
|
|
|
// There is no need in verifying for identical tag keys,
|
|
// since they must be handled by the caller (/api/v1/import/native)
|
|
return nil
|
|
}
|
|
|
|
// The maximum length of label name.
|
|
//
|
|
// Longer names are truncated.
|
|
const maxLabelNameLen = 256
|
|
|
|
// The maximum length of label value.
|
|
//
|
|
// Longer values are truncated.
|
|
var maxLabelValueLen = 16 * 1024
|
|
|
|
// SetMaxLabelValueLen sets the limit on the label value length.
|
|
//
|
|
// This function can be called before using the storage package.
|
|
//
|
|
// Label values with longer length are truncated.
|
|
func SetMaxLabelValueLen(n int) {
|
|
if n > 0 {
|
|
maxLabelValueLen = n
|
|
}
|
|
}
|
|
|
|
// The maximum number of labels per each timeseries.
|
|
var maxLabelsPerTimeseries = 30
|
|
|
|
// SetMaxLabelsPerTimeseries sets the limit on the number of labels
|
|
// per each time series.
|
|
//
|
|
// This function can be called before using the storage package.
|
|
//
|
|
// Superfluous labels are dropped.
|
|
func SetMaxLabelsPerTimeseries(maxLabels int) {
|
|
if maxLabels > 0 {
|
|
maxLabelsPerTimeseries = maxLabels
|
|
}
|
|
}
|
|
|
|
// MarshalMetricNameRaw marshals labels to dst and returns the result.
|
|
//
|
|
// The result must be unmarshaled with MetricName.UnmarshalRaw
|
|
func MarshalMetricNameRaw(dst []byte, accountID, projectID uint32, labels []prompb.Label) []byte {
|
|
// Calculate the required space for dst.
|
|
dstLen := len(dst)
|
|
dstSize := dstLen + 8
|
|
for i := range labels {
|
|
if i >= maxLabelsPerTimeseries {
|
|
trackDroppedLabels(labels, labels[i:])
|
|
break
|
|
}
|
|
label := &labels[i]
|
|
if len(label.Name) > maxLabelNameLen {
|
|
atomic.AddUint64(&TooLongLabelNames, 1)
|
|
label.Name = label.Name[:maxLabelNameLen]
|
|
}
|
|
if len(label.Value) > maxLabelValueLen {
|
|
trackTruncatedLabels(labels, label)
|
|
label.Value = label.Value[:maxLabelValueLen]
|
|
}
|
|
if len(label.Value) == 0 {
|
|
// Skip labels without values, since they have no sense in prometheus.
|
|
continue
|
|
}
|
|
if string(label.Name) == "__name__" {
|
|
label.Name = label.Name[:0]
|
|
}
|
|
dstSize += len(label.Name)
|
|
dstSize += len(label.Value)
|
|
dstSize += 4
|
|
}
|
|
dst = bytesutil.ResizeWithCopyMayOverallocate(dst, dstSize)[:dstLen]
|
|
|
|
// Marshal labels to dst.
|
|
dst = encoding.MarshalUint32(dst, accountID)
|
|
dst = encoding.MarshalUint32(dst, projectID)
|
|
for i := range labels {
|
|
if i >= maxLabelsPerTimeseries {
|
|
break
|
|
}
|
|
label := &labels[i]
|
|
if len(label.Value) == 0 {
|
|
// Skip labels without values, since they have no sense in prometheus.
|
|
continue
|
|
}
|
|
dst = marshalBytesFast(dst, label.Name)
|
|
dst = marshalBytesFast(dst, label.Value)
|
|
}
|
|
return dst
|
|
}
|
|
|
|
var (
|
|
// MetricsWithDroppedLabels is the number of metrics with at least a single dropped label
|
|
MetricsWithDroppedLabels uint64
|
|
|
|
// TooLongLabelNames is the number of too long label names
|
|
TooLongLabelNames uint64
|
|
|
|
// TooLongLabelValues is the number of too long label values
|
|
TooLongLabelValues uint64
|
|
)
|
|
|
|
func trackDroppedLabels(labels, droppedLabels []prompb.Label) {
|
|
atomic.AddUint64(&MetricsWithDroppedLabels, 1)
|
|
select {
|
|
case <-droppedLabelsLogTicker.C:
|
|
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
|
|
// because labelsToString() will be called with each trackDroppedLabels call.
|
|
logger.Warnf("dropping %d labels for %s; dropped labels: %s; either reduce the number of labels for this metric "+
|
|
"or increase -maxLabelsPerTimeseries=%d command-line flag value",
|
|
len(droppedLabels), labelsToString(labels), labelsToString(droppedLabels), maxLabelsPerTimeseries)
|
|
default:
|
|
}
|
|
}
|
|
|
|
func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) {
|
|
atomic.AddUint64(&TooLongLabelValues, 1)
|
|
select {
|
|
case <-truncatedLabelsLogTicker.C:
|
|
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
|
|
// because labelsToString() will be called with each trackTruncatedLabels call.
|
|
logger.Warnf("truncated label value as it exceeds configured maximal label value length: max %d, actual %d;"+
|
|
" truncated label: %s; original labels: %s; either reduce the label value length or increase -maxLabelValueLen=%d;",
|
|
maxLabelValueLen, len(truncated.Value), truncated.Name, labelsToString(labels), maxLabelValueLen)
|
|
default:
|
|
}
|
|
}
|
|
|
|
var droppedLabelsLogTicker = time.NewTicker(5 * time.Second)
|
|
var truncatedLabelsLogTicker = time.NewTicker(5 * time.Second)
|
|
|
|
func labelsToString(labels []prompb.Label) string {
|
|
labelsCopy := append([]prompb.Label{}, labels...)
|
|
sort.Slice(labelsCopy, func(i, j int) bool {
|
|
return string(labelsCopy[i].Name) < string(labelsCopy[j].Name)
|
|
})
|
|
var b []byte
|
|
b = append(b, '{')
|
|
for i, label := range labelsCopy {
|
|
if len(label.Name) == 0 {
|
|
b = append(b, "__name__"...)
|
|
} else {
|
|
b = append(b, label.Name...)
|
|
}
|
|
b = append(b, '=')
|
|
b = strconv.AppendQuote(b, string(label.Value))
|
|
if i < len(labels)-1 {
|
|
b = append(b, ',')
|
|
}
|
|
}
|
|
b = append(b, '}')
|
|
return string(b)
|
|
}
|
|
|
|
// MarshalMetricLabelRaw marshals label to dst.
|
|
func MarshalMetricLabelRaw(dst []byte, label *prompb.Label) []byte {
|
|
dst = marshalBytesFast(dst, label.Name)
|
|
dst = marshalBytesFast(dst, label.Value)
|
|
return dst
|
|
}
|
|
|
|
// marshalRaw marshals mn to dst and returns the result.
|
|
//
|
|
// The results may be unmarshaled with MetricName.UnmarshalRaw.
|
|
//
|
|
// This function is for testing purposes. MarshalMetricNameRaw must be used
|
|
// in prod instead.
|
|
func (mn *MetricName) marshalRaw(dst []byte) []byte {
|
|
dst = encoding.MarshalUint32(dst, mn.AccountID)
|
|
dst = encoding.MarshalUint32(dst, mn.ProjectID)
|
|
dst = marshalBytesFast(dst, nil)
|
|
dst = marshalBytesFast(dst, mn.MetricGroup)
|
|
|
|
mn.sortTags()
|
|
for i := range mn.Tags {
|
|
tag := &mn.Tags[i]
|
|
dst = marshalBytesFast(dst, tag.Key)
|
|
dst = marshalBytesFast(dst, tag.Value)
|
|
}
|
|
return dst
|
|
}
|
|
|
|
// UnmarshalRaw unmarshals mn encoded with MarshalMetricNameRaw.
|
|
func (mn *MetricName) UnmarshalRaw(src []byte) error {
|
|
mn.Reset()
|
|
if len(src) < 4 {
|
|
return fmt.Errorf("not enough data for decoding accountID; got %d bytes; %X; want at least 4 bytes", len(src), src)
|
|
}
|
|
mn.AccountID = encoding.UnmarshalUint32(src)
|
|
src = src[4:]
|
|
if len(src) < 4 {
|
|
return fmt.Errorf("not enough data for decoding projectID; got %d bytes; %X; want at least 4 bytes", len(src), src)
|
|
}
|
|
mn.ProjectID = encoding.UnmarshalUint32(src)
|
|
src = src[4:]
|
|
for len(src) > 0 {
|
|
tail, key, err := unmarshalBytesFast(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot decode key: %w", err)
|
|
}
|
|
src = tail
|
|
|
|
tail, value, err := unmarshalBytesFast(src)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot decode value: %w", err)
|
|
}
|
|
src = tail
|
|
|
|
if len(key) == 0 {
|
|
mn.MetricGroup = append(mn.MetricGroup[:0], value...)
|
|
} else {
|
|
mn.AddTagBytes(key, value)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func marshalBytesFast(dst []byte, s []byte) []byte {
|
|
dst = encoding.MarshalUint16(dst, uint16(len(s)))
|
|
dst = append(dst, s...)
|
|
return dst
|
|
}
|
|
|
|
func unmarshalBytesFast(src []byte) ([]byte, []byte, error) {
|
|
if len(src) < 2 {
|
|
return src, nil, fmt.Errorf("cannot decode size form src=%X; it must be at least 2 bytes", src)
|
|
}
|
|
n := encoding.UnmarshalUint16(src)
|
|
src = src[2:]
|
|
if len(src) < int(n) {
|
|
return src, nil, fmt.Errorf("too short src=%X; it must be at least %d bytes", src, n)
|
|
}
|
|
return src[n:], src[:n], nil
|
|
}
|
|
|
|
// sortTags sorts tags in mn to canonical form needed for storing in the index.
|
|
//
|
|
// The sortTags tries moving job-like tag to mn.Tags[0], while instance-like tag to mn.Tags[1].
|
|
// See commonTagKeys list for job-like and instance-like tags.
|
|
// This guarantees that indexdb entries for the same (job, instance) are located
|
|
// close to each other on disk. This reduces disk seeks and disk read IO when metrics
|
|
// for a particular job and/or instance are read from the disk.
|
|
//
|
|
// The function also de-duplicates tags with identical keys in mn. The last tag value
|
|
// for duplicate tags wins.
|
|
//
|
|
// Tags sorting is quite slow, so try avoiding it by caching mn
|
|
// with sorted tags.
|
|
func (mn *MetricName) sortTags() {
|
|
if len(mn.Tags) == 0 {
|
|
return
|
|
}
|
|
|
|
cts := getCanonicalTags()
|
|
if n := len(mn.Tags) - cap(cts.tags); n > 0 {
|
|
cts.tags = append(cts.tags[:cap(cts.tags)], make([]canonicalTag, n)...)
|
|
}
|
|
dst := cts.tags[:len(mn.Tags)]
|
|
for i := range mn.Tags {
|
|
tag := &mn.Tags[i]
|
|
ct := &dst[i]
|
|
ct.key = normalizeTagKey(tag.Key)
|
|
ct.tag.copyFrom(tag)
|
|
}
|
|
cts.tags = dst
|
|
|
|
// Use sort.Stable instead of sort.Sort in order to preserve the order of tags with duplicate keys.
|
|
// The last tag value wins for tags with duplicate keys.
|
|
// Use sort.Stable instead of sort.SliceStable, since sort.SliceStable allocates a lot.
|
|
sort.Stable(&cts.tags)
|
|
|
|
j := 0
|
|
var prevKey []byte
|
|
for i := range cts.tags {
|
|
tag := &cts.tags[i].tag
|
|
if j > 0 && bytes.Equal(tag.Key, prevKey) {
|
|
// Overwrite the previous tag with duplicate key.
|
|
j--
|
|
} else {
|
|
prevKey = tag.Key
|
|
}
|
|
mn.Tags[j].copyFrom(tag)
|
|
j++
|
|
}
|
|
mn.Tags = mn.Tags[:j]
|
|
|
|
putCanonicalTags(cts)
|
|
}
|
|
|
|
func getCanonicalTags() *canonicalTags {
|
|
v := canonicalTagsPool.Get()
|
|
if v == nil {
|
|
return &canonicalTags{}
|
|
}
|
|
return v.(*canonicalTags)
|
|
}
|
|
|
|
func putCanonicalTags(cts *canonicalTags) {
|
|
cts.tags = cts.tags[:0]
|
|
canonicalTagsPool.Put(cts)
|
|
}
|
|
|
|
var canonicalTagsPool sync.Pool
|
|
|
|
type canonicalTags struct {
|
|
tags canonicalTagsSort
|
|
}
|
|
|
|
type canonicalTag struct {
|
|
key []byte
|
|
tag Tag
|
|
}
|
|
|
|
type canonicalTagsSort []canonicalTag
|
|
|
|
func (ts *canonicalTagsSort) Len() int { return len(*ts) }
|
|
func (ts *canonicalTagsSort) Less(i, j int) bool {
|
|
x := *ts
|
|
return string(x[i].key) < string(x[j].key)
|
|
}
|
|
func (ts *canonicalTagsSort) Swap(i, j int) {
|
|
x := *ts
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func copyTags(dst, src []Tag) []Tag {
|
|
dstLen := len(dst)
|
|
if n := dstLen + len(src) - cap(dst); n > 0 {
|
|
dst = append(dst[:cap(dst)], make([]Tag, n)...)
|
|
}
|
|
dst = dst[:dstLen+len(src)]
|
|
for i := range src {
|
|
dst[dstLen+i].copyFrom(&src[i])
|
|
}
|
|
return dst
|
|
}
|
|
|
|
var commonTagKeys = func() map[string][]byte {
|
|
lcm := map[string][]byte{
|
|
// job-like tags must go first in MetricName.Tags.
|
|
// This should improve data locality.
|
|
// They start with \x00\x00.
|
|
// Do not change values!
|
|
//
|
|
// TODO: add more job-like tags.
|
|
"namespace": []byte("\x00\x00\x00"),
|
|
"ns": []byte("\x00\x00\x01"),
|
|
"datacenter": []byte("\x00\x00\x08"),
|
|
"dc": []byte("\x00\x00\x09"),
|
|
"environment": []byte("\x00\x00\x0c"),
|
|
"env": []byte("\x00\x00\x0d"),
|
|
"cluster": []byte("\x00\x00\x10"),
|
|
"service": []byte("\x00\x00\x18"),
|
|
"job": []byte("\x00\x00\x20"),
|
|
"model": []byte("\x00\x00\x28"),
|
|
"type": []byte("\x00\x00\x30"),
|
|
"sensor_type": []byte("\x00\x00\x38"),
|
|
"SensorType": []byte("\x00\x00\x38"),
|
|
"db": []byte("\x00\x00\x40"),
|
|
|
|
// instance-like tags must go second in MetricName.Tags.
|
|
// This should improve data locality.
|
|
// They start with \x00\x01.
|
|
// Do not change values!
|
|
//
|
|
// TODO: add more instance-like tags.
|
|
"instance": []byte("\x00\x01\x00"),
|
|
"host": []byte("\x00\x01\x08"),
|
|
"server": []byte("\x00\x01\x10"),
|
|
"pod": []byte("\x00\x01\x18"),
|
|
"node": []byte("\x00\x01\x20"),
|
|
"device": []byte("\x00\x01\x28"),
|
|
"tenant": []byte("\x00\x01\x30"),
|
|
"client": []byte("\x00\x01\x38"),
|
|
"name": []byte("\x00\x01\x40"),
|
|
"measurement": []byte("\x00\x01\x48"),
|
|
}
|
|
|
|
// Generate Upper-case variants of lc
|
|
m := make(map[string][]byte, len(lcm)*2)
|
|
for k, v := range lcm {
|
|
s := strings.ToUpper(k[:1]) + k[1:]
|
|
m[k] = v
|
|
m[s] = v
|
|
}
|
|
return m
|
|
}()
|
|
|
|
func normalizeTagKey(key []byte) []byte {
|
|
tagKey := commonTagKeys[string(key)]
|
|
if tagKey == nil {
|
|
return key
|
|
}
|
|
return tagKey
|
|
}
|