2023-07-20 08:10:55 +00:00
|
|
|
package loki
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2023-07-20 23:21:47 +00:00
|
|
|
"strconv"
|
2023-07-20 08:10:55 +00:00
|
|
|
"testing"
|
2023-07-20 23:21:47 +00:00
|
|
|
"time"
|
2023-07-20 08:10:55 +00:00
|
|
|
|
2023-07-20 23:21:47 +00:00
|
|
|
"github.com/golang/snappy"
|
2023-09-29 09:55:38 +00:00
|
|
|
|
2024-06-17 20:28:15 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
2024-07-10 00:42:41 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
2023-07-20 08:10:55 +00:00
|
|
|
)
|
|
|
|
|
2023-07-20 23:21:47 +00:00
|
|
|
func BenchmarkParseProtobufRequest(b *testing.B) {
|
2023-07-20 08:10:55 +00:00
|
|
|
for _, streams := range []int{5, 10} {
|
|
|
|
for _, rows := range []int{100, 1000} {
|
|
|
|
for _, labels := range []int{10, 50} {
|
|
|
|
b.Run(fmt.Sprintf("streams_%d/rows_%d/labels_%d", streams, rows, labels), func(b *testing.B) {
|
2023-07-20 23:21:47 +00:00
|
|
|
benchmarkParseProtobufRequest(b, streams, rows, labels)
|
2023-07-20 08:10:55 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-20 23:21:47 +00:00
|
|
|
func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
|
2024-06-17 20:28:15 +00:00
|
|
|
blp := &insertutils.BenchmarkLogMessageProcessor{}
|
2023-07-20 08:10:55 +00:00
|
|
|
b.ReportAllocs()
|
2023-07-20 23:21:47 +00:00
|
|
|
b.SetBytes(int64(streams * rows))
|
2023-07-20 08:10:55 +00:00
|
|
|
b.RunParallel(func(pb *testing.PB) {
|
2023-07-20 23:21:47 +00:00
|
|
|
body := getProtobufBody(streams, rows, labels)
|
2023-07-20 08:10:55 +00:00
|
|
|
for pb.Next() {
|
2024-06-17 20:28:15 +00:00
|
|
|
_, err := parseProtobufRequest(body, blp)
|
2023-07-20 08:10:55 +00:00
|
|
|
if err != nil {
|
2023-10-25 19:24:01 +00:00
|
|
|
panic(fmt.Errorf("unexpected error: %w", err))
|
2023-07-20 08:10:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
2023-07-20 23:21:47 +00:00
|
|
|
|
2024-07-10 00:42:41 +00:00
|
|
|
func getProtobufBody(streamsCount, rowsCount, labelsCount int) []byte {
|
|
|
|
var b []byte
|
|
|
|
var entries []Entry
|
|
|
|
streams := make([]Stream, streamsCount)
|
|
|
|
for i := range streams {
|
|
|
|
b = b[:0]
|
|
|
|
b = append(b, '{')
|
|
|
|
for j := 0; j < labelsCount; j++ {
|
|
|
|
b = append(b, "label_"...)
|
|
|
|
b = strconv.AppendInt(b, int64(j), 10)
|
|
|
|
b = append(b, `="value_`...)
|
|
|
|
b = strconv.AppendInt(b, int64(j), 10)
|
|
|
|
b = append(b, '"')
|
|
|
|
if j < labelsCount-1 {
|
|
|
|
b = append(b, ',')
|
2023-07-20 23:21:47 +00:00
|
|
|
}
|
|
|
|
}
|
2024-07-10 00:42:41 +00:00
|
|
|
b = append(b, '}')
|
|
|
|
labels := string(b)
|
2023-07-20 23:21:47 +00:00
|
|
|
|
2024-07-10 00:42:41 +00:00
|
|
|
var rowsBuf []byte
|
|
|
|
entriesLen := len(entries)
|
|
|
|
for j := 0; j < rowsCount; j++ {
|
|
|
|
rowsBufLen := len(rowsBuf)
|
|
|
|
rowsBuf = append(rowsBuf, "value_"...)
|
|
|
|
rowsBuf = strconv.AppendInt(rowsBuf, int64(j), 10)
|
|
|
|
entries = append(entries, Entry{
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
Line: bytesutil.ToUnsafeString(rowsBuf[rowsBufLen:]),
|
|
|
|
})
|
2023-07-20 23:21:47 +00:00
|
|
|
}
|
|
|
|
|
2024-07-10 00:42:41 +00:00
|
|
|
st := &streams[i]
|
|
|
|
st.Labels = labels
|
|
|
|
st.Entries = entries[entriesLen:]
|
|
|
|
}
|
|
|
|
pr := PushRequest{
|
|
|
|
Streams: streams,
|
2023-07-20 23:21:47 +00:00
|
|
|
}
|
|
|
|
|
2024-07-10 00:42:41 +00:00
|
|
|
body := pr.MarshalProtobuf(nil)
|
2023-07-20 23:21:47 +00:00
|
|
|
encodedBody := snappy.Encode(nil, body)
|
|
|
|
|
|
|
|
return encodedBody
|
|
|
|
}
|