lib/protoparser/prometheus: optimize GetRowsDiff() function

This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1745 ,
since the provided profile shows that the majority of CPU and memory is spent in this function
during `streamParse` when `-promscrape.noStaleMarkers` wasn't set.
This commit is contained in:
Aliaksandr Valialkin 2021-10-27 18:51:09 +03:00
parent 16f1aaf0b5
commit 92d01db85a
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 150 additions and 32 deletions

View file

@ -1,9 +1,11 @@
package prometheus package prometheus
import ( import (
"bytes"
"fmt" "fmt"
"strconv" "sort"
"strings" "strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -357,6 +359,29 @@ func unescapeValue(s string) string {
return string(b) return string(b)
} }
func appendEscapedValue(dst []byte, s string) []byte {
// label_value can be any sequence of UTF-8 characters, but the backslash (\), double-quote ("),
// and line feed (\n) characters have to be escaped as \\, \", and \n, respectively.
// See https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
for {
n := strings.IndexAny(s, "\\\"\n")
if n < 0 {
return append(dst, s...)
}
dst = append(dst, s[:n]...)
switch s[n] {
case '\\':
dst = append(dst, "\\\\"...)
case '"':
dst = append(dst, "\\\""...)
case '\n':
dst = append(dst, "\\n"...)
}
s = s[n+1:]
}
}
func prevBackslashesCount(s string) int { func prevBackslashesCount(s string) int {
n := 0 n := 0
for len(s) > 0 && s[len(s)-1] == '\\' { for len(s) > 0 && s[len(s)-1] == '\\' {
@ -370,46 +395,125 @@ func prevBackslashesCount(s string) int {
// //
// The returned rows have default value 0 and have no timestamps. // The returned rows have default value 0 and have no timestamps.
func GetRowsDiff(s1, s2 string) string { func GetRowsDiff(s1, s2 string) string {
var r1, r2 Rows li1 := getLinesIterator()
r1.Unmarshal(s1) li2 := getLinesIterator()
r2.Unmarshal(s2) defer func() {
rows1 := r1.Rows putLinesIterator(li1)
rows2 := r2.Rows putLinesIterator(li2)
m := make(map[string]bool, len(rows2)) }()
for i := range rows2 { li1.Init(s1)
r := &rows2[i] li2.Init(s2)
key := marshalMetricNameWithTags(r) if !li1.NextKey() {
m[key] = true return ""
} }
var diff []byte var diff []byte
for i := range rows1 { if !li2.NextKey() {
r := &rows1[i] diff = appendKeys(diff, li1)
key := marshalMetricNameWithTags(r) return string(diff)
if !m[key] { }
diff = append(diff, key...) for {
diff = append(diff, " 0\n"...) switch bytes.Compare(li1.Key, li2.Key) {
case -1:
diff = appendKey(diff, li1.Key)
if !li1.NextKey() {
return string(diff)
}
case 0:
if !li1.NextKey() {
return string(diff)
}
if !li2.NextKey() {
diff = appendKeys(diff, li1)
return string(diff)
}
case 1:
if !li2.NextKey() {
diff = appendKeys(diff, li1)
return string(diff)
}
} }
} }
return string(diff)
} }
func marshalMetricNameWithTags(r *Row) string { type linesIterator struct {
if len(r.Tags) == 0 { rows []Row
return r.Metric a []string
tagsPool []Tag
// Key contains the next key after NextKey call
Key []byte
}
var linesIteratorPool sync.Pool
func getLinesIterator() *linesIterator {
v := linesIteratorPool.Get()
if v == nil {
return &linesIterator{}
} }
var b []byte return v.(*linesIterator)
b = append(b, r.Metric...) }
b = append(b, '{')
for i, t := range r.Tags { func putLinesIterator(li *linesIterator) {
b = append(b, t.Key...) li.a = nil
b = append(b, '=') linesIteratorPool.Put(li)
b = strconv.AppendQuote(b, t.Value) }
if i+1 < len(r.Tags) {
b = append(b, ',') func (li *linesIterator) Init(s string) {
a := strings.Split(s, "\n")
sort.Strings(a)
li.a = a
}
// NextKey advances to the next key in li.
//
// It returns true if the next key is found and Key is successcully updated.
func (li *linesIterator) NextKey() bool {
for {
if len(li.a) == 0 {
return false
}
li.rows, li.tagsPool = unmarshalRow(li.rows[:0], li.a[0], li.tagsPool[:0], false, stdErrLogger)
li.a = li.a[1:]
if len(li.rows) > 0 {
li.Key = marshalMetricNameWithTags(li.Key[:0], &li.rows[0])
return true
} }
} }
b = append(b, '}') }
return string(b)
func appendKey(dst, key []byte) []byte {
dst = append(dst, key...)
dst = append(dst, " 0\n"...)
return dst
}
func appendKeys(dst []byte, li *linesIterator) []byte {
for {
dst = appendKey(dst, li.Key)
if !li.NextKey() {
return dst
}
}
}
func marshalMetricNameWithTags(dst []byte, r *Row) []byte {
dst = append(dst, r.Metric...)
if len(r.Tags) == 0 {
return dst
}
dst = append(dst, '{')
for i, t := range r.Tags {
dst = append(dst, t.Key...)
dst = append(dst, `="`...)
dst = appendEscapedValue(dst, t.Value)
dst = append(dst, '"')
if i+1 < len(r.Tags) {
dst = append(dst, ',')
}
}
dst = append(dst, '}')
return dst
} }
// AreIdenticalSeriesFast returns true if s1 and s2 contains identical Prometheus series with possible different values. // AreIdenticalSeriesFast returns true if s1 and s2 contains identical Prometheus series with possible different values.

View file

@ -164,6 +164,20 @@ func TestUnescapeValue(t *testing.T) {
f(`foo\`, "foo\\") f(`foo\`, "foo\\")
} }
func TestAppendEscapedValue(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
result := appendEscapedValue(nil, s)
if string(result) != resultExpected {
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
}
}
f(``, ``)
f(`f`, `f`)
f(`foobar`, `foobar`)
f("\"\n\t\\xyz", "\\\"\\n\t\\\\xyz")
}
func TestRowsUnmarshalFailure(t *testing.T) { func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()