mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00

Previously, performance of stream.Parse could be limited by mutex.Lock on callback function. It used shared writeContext. With complicated relabeling rules and any slowness at pushData function, it could significantly decrease parsed rows processing performance. This commit removes locks and makes parsed rows processing lock-free in the same manner as `stream.Parse` processing implemented at push ingestion processing. Implementation details: - Removing global lock around stream.Parse callback. - Using atomic operations for counters - Creating write contexts per callback instead of sharing - Improving series limit checking with sync.Once - Optimizing labels hash calculation with buffer pooling - Adding comprehensive tests for concurrency correctness Benchmark performance: ``` # before BenchmarkScrapeWorkScrapeInternalStreamBigData-10 13 81973945 ns/op 37.68 MB/s 18947868 B/op 197 allocs/op # after goos: darwin goarch: arm64 pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape cpu: Apple M1 Pro BenchmarkScrapeWorkScrapeInternalStreamBigData-10 74 15761331 ns/op 195.98 MB/s 15487399 B/op 148 allocs/op PASS ok github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape 1.806s ``` Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8159 --------- Signed-off-by: Maksim Kotlyar <kotlyar.maksim@gmail.com> Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
212 lines
6.6 KiB
Go
212 lines
6.6 KiB
Go
package promscrape
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
|
|
)
|
|
|
|
func BenchmarkIsAutoMetricMiss(b *testing.B) {
|
|
metrics := []string{
|
|
"process_cpu_seconds_total",
|
|
"process_resident_memory_bytes",
|
|
"vm_tcplistener_read_calls_total",
|
|
"http_requests_total",
|
|
"node_cpu_seconds_total",
|
|
}
|
|
b.ReportAllocs()
|
|
b.SetBytes(1)
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
for pb.Next() {
|
|
for _, metric := range metrics {
|
|
if isAutoMetric(metric) {
|
|
panic(fmt.Errorf("BUG: %q mustn't be detected as auto metric", metric))
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func BenchmarkIsAutoMetricHit(b *testing.B) {
|
|
metrics := []string{
|
|
"up",
|
|
"scrape_duration_seconds",
|
|
"scrape_series_current",
|
|
"scrape_samples_scraped",
|
|
"scrape_series_added",
|
|
}
|
|
b.ReportAllocs()
|
|
b.SetBytes(1)
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
for pb.Next() {
|
|
for _, metric := range metrics {
|
|
if !isAutoMetric(metric) {
|
|
panic(fmt.Errorf("BUG: %q must be detected as auto metric", metric))
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func BenchmarkScrapeWorkScrapeInternalOneShot(b *testing.B) {
|
|
data := `
|
|
vm_tcplistener_accepts_total{name="http", addr=":80"} 1443
|
|
vm_tcplistener_accepts_total{name="https", addr=":443"} 12801
|
|
vm_tcplistener_conns{name="http", addr=":80"} 0
|
|
vm_tcplistener_conns{name="https", addr=":443"} 2
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="accept"} 0
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="close"} 0
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="read"} 97
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="write"} 2
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="accept"} 0
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="close"} 0
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="read"} 243
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="write"} 285
|
|
vm_tcplistener_read_bytes_total{name="http", addr=":80"} 879339
|
|
vm_tcplistener_read_bytes_total{name="https", addr=":443"} 19453340
|
|
vm_tcplistener_read_calls_total{name="http", addr=":80"} 7780
|
|
vm_tcplistener_read_calls_total{name="https", addr=":443"} 70323
|
|
vm_tcplistener_read_timeouts_total{name="http", addr=":80"} 673
|
|
vm_tcplistener_read_timeouts_total{name="https", addr=":443"} 12353
|
|
vm_tcplistener_write_calls_total{name="http", addr=":80"} 3996
|
|
vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
|
|
`
|
|
readDataFunc := func(dst *bytesutil.ByteBuffer) error {
|
|
dst.B = append(dst.B, data...)
|
|
return nil
|
|
}
|
|
b.ReportAllocs()
|
|
b.SetBytes(int64(len(data)))
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
var sw scrapeWork
|
|
sw.Config = &ScrapeWork{}
|
|
sw.ReadData = readDataFunc
|
|
sw.PushData = func(_ *auth.Token, _ *prompbmarshal.WriteRequest) {}
|
|
tsmGlobal.Register(&sw)
|
|
timestamp := int64(0)
|
|
for pb.Next() {
|
|
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
|
|
panic(fmt.Errorf("unexpected error: %w", err))
|
|
}
|
|
timestamp++
|
|
}
|
|
tsmGlobal.Unregister(&sw)
|
|
})
|
|
}
|
|
|
|
func BenchmarkScrapeWorkScrapeInternalStream(b *testing.B) {
|
|
data := `
|
|
vm_tcplistener_accepts_total{name="http", addr=":80"} 1443
|
|
vm_tcplistener_accepts_total{name="https", addr=":443"} 12801
|
|
vm_tcplistener_conns{name="http", addr=":80"} 0
|
|
vm_tcplistener_conns{name="https", addr=":443"} 2
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="accept"} 0
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="close"} 0
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="read"} 97
|
|
vm_tcplistener_errors_total{name="http", addr=":80", type="write"} 2
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="accept"} 0
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="close"} 0
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="read"} 243
|
|
vm_tcplistener_errors_total{name="https", addr=":443", type="write"} 285
|
|
vm_tcplistener_read_bytes_total{name="http", addr=":80"} 879339
|
|
vm_tcplistener_read_bytes_total{name="https", addr=":443"} 19453340
|
|
vm_tcplistener_read_calls_total{name="http", addr=":80"} 7780
|
|
vm_tcplistener_read_calls_total{name="https", addr=":443"} 70323
|
|
vm_tcplistener_read_timeouts_total{name="http", addr=":80"} 673
|
|
vm_tcplistener_read_timeouts_total{name="https", addr=":443"} 12353
|
|
vm_tcplistener_write_calls_total{name="http", addr=":80"} 3996
|
|
vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
|
|
`
|
|
protoparserutil.StartUnmarshalWorkers()
|
|
defer protoparserutil.StopUnmarshalWorkers()
|
|
|
|
readDataFunc := func(dst *bytesutil.ByteBuffer) error {
|
|
dst.B = append(dst.B, data...)
|
|
return nil
|
|
}
|
|
b.ReportAllocs()
|
|
b.SetBytes(int64(len(data)))
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
var sw scrapeWork
|
|
sw.Config = &ScrapeWork{
|
|
StreamParse: true,
|
|
}
|
|
sw.ReadData = readDataFunc
|
|
sw.PushData = func(_ *auth.Token, _ *prompbmarshal.WriteRequest) {}
|
|
tsmGlobal.Register(&sw)
|
|
timestamp := int64(0)
|
|
for pb.Next() {
|
|
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
|
|
panic(fmt.Errorf("unexpected error: %w", err))
|
|
}
|
|
timestamp++
|
|
}
|
|
tsmGlobal.Unregister(&sw)
|
|
})
|
|
}
|
|
|
|
func BenchmarkScrapeWorkScrapeInternalStreamBigData(b *testing.B) {
|
|
generateScrape := func(n int) string {
|
|
w := strings.Builder{}
|
|
for i := 0; i < n; i++ {
|
|
w.WriteString(fmt.Sprintf("fooooo_%d 1\n", i))
|
|
}
|
|
return w.String()
|
|
}
|
|
|
|
data := generateScrape(200000)
|
|
protoparserutil.StartUnmarshalWorkers()
|
|
defer protoparserutil.StopUnmarshalWorkers()
|
|
|
|
readDataFunc := func(dst *bytesutil.ByteBuffer) error {
|
|
dst.B = append(dst.B, data...)
|
|
return nil
|
|
}
|
|
|
|
var sw scrapeWork
|
|
sw.Config = &ScrapeWork{
|
|
StreamParse: true,
|
|
}
|
|
sw.ReadData = readDataFunc
|
|
sw.PushData = func(_ *auth.Token, _ *prompbmarshal.WriteRequest) {
|
|
// simulates a delay to highlight the difference between lock-based and lock-free algorithms.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8515
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
tsmGlobal.Register(&sw)
|
|
defer tsmGlobal.Unregister(&sw)
|
|
|
|
b.ReportAllocs()
|
|
b.SetBytes(int64(len(data)))
|
|
|
|
timestamp := int64(0)
|
|
for b.Loop() {
|
|
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
|
|
panic(fmt.Errorf("unexpected error: %w", err))
|
|
}
|
|
timestamp++
|
|
}
|
|
}
|
|
|
|
func BenchmarkScrapeWorkGetLabelsHash(b *testing.B) {
|
|
labels := []prompbmarshal.Label{}
|
|
for i := 0; i < 100; i++ {
|
|
labels = append(labels, prompbmarshal.Label{
|
|
Name: fmt.Sprintf("name%d", i),
|
|
Value: fmt.Sprintf("value%d", i),
|
|
})
|
|
}
|
|
|
|
var sw scrapeWork
|
|
|
|
b.ReportAllocs()
|
|
for b.Loop() {
|
|
sw.getLabelsHash(labels)
|
|
}
|
|
}
|