VictoriaMetrics/lib/promscrape/scrapework_timing_test.go
Max Kotliar 0778c90901
lib/promscrape: improve streamParse performance
Previously, performance of stream.Parse could be limited by mutex.Lock on callback function. It used shared writeContext. With complicated relabeling rules and any slowness at pushData function, it could significantly decrease parsed rows processing performance.

 This commit removes locks and makes parsed rows processing lock-free in the same manner as `stream.Parse` processing implemented at push ingestion processing.

 Implementation details:
- Removing global lock around stream.Parse callback.
- Using atomic operations for counters
- Creating write contexts per callback instead of sharing
- Improving series limit checking with sync.Once
- Optimizing labels hash calculation with buffer pooling
- Adding comprehensive tests for concurrency correctness

 Benchmark performance:
```
# before
BenchmarkScrapeWorkScrapeInternalStreamBigData-10             13          81973945 ns/op          37.68 MB/s    18947868 B/op        197 allocs/op

# after
goos: darwin
goarch: arm64
pkg: github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape
cpu: Apple M1 Pro
BenchmarkScrapeWorkScrapeInternalStreamBigData-10             74          15761331 ns/op         195.98 MB/s    15487399 B/op        148 allocs/op
PASS
ok      github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape       1.806s
```

Related issue:
 https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8159
---------
Signed-off-by: Maksim Kotlyar <kotlyar.maksim@gmail.com>
Co-authored-by: Roman Khavronenko <hagen1778@gmail.com>
2025-03-20 16:56:05 +01:00

212 lines
6.6 KiB
Go

package promscrape
import (
"fmt"
"strings"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
)
func BenchmarkIsAutoMetricMiss(b *testing.B) {
metrics := []string{
"process_cpu_seconds_total",
"process_resident_memory_bytes",
"vm_tcplistener_read_calls_total",
"http_requests_total",
"node_cpu_seconds_total",
}
b.ReportAllocs()
b.SetBytes(1)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, metric := range metrics {
if isAutoMetric(metric) {
panic(fmt.Errorf("BUG: %q mustn't be detected as auto metric", metric))
}
}
}
})
}
func BenchmarkIsAutoMetricHit(b *testing.B) {
metrics := []string{
"up",
"scrape_duration_seconds",
"scrape_series_current",
"scrape_samples_scraped",
"scrape_series_added",
}
b.ReportAllocs()
b.SetBytes(1)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, metric := range metrics {
if !isAutoMetric(metric) {
panic(fmt.Errorf("BUG: %q must be detected as auto metric", metric))
}
}
}
})
}
func BenchmarkScrapeWorkScrapeInternalOneShot(b *testing.B) {
data := `
vm_tcplistener_accepts_total{name="http", addr=":80"} 1443
vm_tcplistener_accepts_total{name="https", addr=":443"} 12801
vm_tcplistener_conns{name="http", addr=":80"} 0
vm_tcplistener_conns{name="https", addr=":443"} 2
vm_tcplistener_errors_total{name="http", addr=":80", type="accept"} 0
vm_tcplistener_errors_total{name="http", addr=":80", type="close"} 0
vm_tcplistener_errors_total{name="http", addr=":80", type="read"} 97
vm_tcplistener_errors_total{name="http", addr=":80", type="write"} 2
vm_tcplistener_errors_total{name="https", addr=":443", type="accept"} 0
vm_tcplistener_errors_total{name="https", addr=":443", type="close"} 0
vm_tcplistener_errors_total{name="https", addr=":443", type="read"} 243
vm_tcplistener_errors_total{name="https", addr=":443", type="write"} 285
vm_tcplistener_read_bytes_total{name="http", addr=":80"} 879339
vm_tcplistener_read_bytes_total{name="https", addr=":443"} 19453340
vm_tcplistener_read_calls_total{name="http", addr=":80"} 7780
vm_tcplistener_read_calls_total{name="https", addr=":443"} 70323
vm_tcplistener_read_timeouts_total{name="http", addr=":80"} 673
vm_tcplistener_read_timeouts_total{name="https", addr=":443"} 12353
vm_tcplistener_write_calls_total{name="http", addr=":80"} 3996
vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
`
readDataFunc := func(dst *bytesutil.ByteBuffer) error {
dst.B = append(dst.B, data...)
return nil
}
b.ReportAllocs()
b.SetBytes(int64(len(data)))
b.RunParallel(func(pb *testing.PB) {
var sw scrapeWork
sw.Config = &ScrapeWork{}
sw.ReadData = readDataFunc
sw.PushData = func(_ *auth.Token, _ *prompbmarshal.WriteRequest) {}
tsmGlobal.Register(&sw)
timestamp := int64(0)
for pb.Next() {
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
timestamp++
}
tsmGlobal.Unregister(&sw)
})
}
func BenchmarkScrapeWorkScrapeInternalStream(b *testing.B) {
data := `
vm_tcplistener_accepts_total{name="http", addr=":80"} 1443
vm_tcplistener_accepts_total{name="https", addr=":443"} 12801
vm_tcplistener_conns{name="http", addr=":80"} 0
vm_tcplistener_conns{name="https", addr=":443"} 2
vm_tcplistener_errors_total{name="http", addr=":80", type="accept"} 0
vm_tcplistener_errors_total{name="http", addr=":80", type="close"} 0
vm_tcplistener_errors_total{name="http", addr=":80", type="read"} 97
vm_tcplistener_errors_total{name="http", addr=":80", type="write"} 2
vm_tcplistener_errors_total{name="https", addr=":443", type="accept"} 0
vm_tcplistener_errors_total{name="https", addr=":443", type="close"} 0
vm_tcplistener_errors_total{name="https", addr=":443", type="read"} 243
vm_tcplistener_errors_total{name="https", addr=":443", type="write"} 285
vm_tcplistener_read_bytes_total{name="http", addr=":80"} 879339
vm_tcplistener_read_bytes_total{name="https", addr=":443"} 19453340
vm_tcplistener_read_calls_total{name="http", addr=":80"} 7780
vm_tcplistener_read_calls_total{name="https", addr=":443"} 70323
vm_tcplistener_read_timeouts_total{name="http", addr=":80"} 673
vm_tcplistener_read_timeouts_total{name="https", addr=":443"} 12353
vm_tcplistener_write_calls_total{name="http", addr=":80"} 3996
vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
`
protoparserutil.StartUnmarshalWorkers()
defer protoparserutil.StopUnmarshalWorkers()
readDataFunc := func(dst *bytesutil.ByteBuffer) error {
dst.B = append(dst.B, data...)
return nil
}
b.ReportAllocs()
b.SetBytes(int64(len(data)))
b.RunParallel(func(pb *testing.PB) {
var sw scrapeWork
sw.Config = &ScrapeWork{
StreamParse: true,
}
sw.ReadData = readDataFunc
sw.PushData = func(_ *auth.Token, _ *prompbmarshal.WriteRequest) {}
tsmGlobal.Register(&sw)
timestamp := int64(0)
for pb.Next() {
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
timestamp++
}
tsmGlobal.Unregister(&sw)
})
}
func BenchmarkScrapeWorkScrapeInternalStreamBigData(b *testing.B) {
generateScrape := func(n int) string {
w := strings.Builder{}
for i := 0; i < n; i++ {
w.WriteString(fmt.Sprintf("fooooo_%d 1\n", i))
}
return w.String()
}
data := generateScrape(200000)
protoparserutil.StartUnmarshalWorkers()
defer protoparserutil.StopUnmarshalWorkers()
readDataFunc := func(dst *bytesutil.ByteBuffer) error {
dst.B = append(dst.B, data...)
return nil
}
var sw scrapeWork
sw.Config = &ScrapeWork{
StreamParse: true,
}
sw.ReadData = readDataFunc
sw.PushData = func(_ *auth.Token, _ *prompbmarshal.WriteRequest) {
// simulates a delay to highlight the difference between lock-based and lock-free algorithms.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8515
time.Sleep(time.Millisecond)
}
tsmGlobal.Register(&sw)
defer tsmGlobal.Unregister(&sw)
b.ReportAllocs()
b.SetBytes(int64(len(data)))
timestamp := int64(0)
for b.Loop() {
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
panic(fmt.Errorf("unexpected error: %w", err))
}
timestamp++
}
}
func BenchmarkScrapeWorkGetLabelsHash(b *testing.B) {
labels := []prompbmarshal.Label{}
for i := 0; i < 100; i++ {
labels = append(labels, prompbmarshal.Label{
Name: fmt.Sprintf("name%d", i),
Value: fmt.Sprintf("value%d", i),
})
}
var sw scrapeWork
b.ReportAllocs()
for b.Loop() {
sw.getLabelsHash(labels)
}
}