mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vmctl: add remote read protocol integration tests (#3626)
This commit is contained in:
parent
2e018aebf3
commit
99fbb2948b
3 changed files with 771 additions and 0 deletions
app/vmctl
319
app/vmctl/remote_read_test.go
Normal file
319
app/vmctl/remote_read_test.go
Normal file
|
@ -0,0 +1,319 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRemoteRead(t *testing.T) {
|
||||||
|
|
||||||
|
var testCases = []struct {
|
||||||
|
name string
|
||||||
|
remoteReadConfig remoteread.Config
|
||||||
|
vmCfg vm.Config
|
||||||
|
start string
|
||||||
|
end string
|
||||||
|
numOfSamples int64
|
||||||
|
numOfSeries int64
|
||||||
|
rrp remoteReadProcessor
|
||||||
|
chunk string
|
||||||
|
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
||||||
|
expectedSeries []vm.TimeSeries
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "step minute on minute time range",
|
||||||
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
||||||
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
||||||
|
start: "2022-11-26T11:23:05+02:00",
|
||||||
|
end: "2022-11-26T11:24:05+02:00",
|
||||||
|
numOfSamples: 2,
|
||||||
|
numOfSeries: 3,
|
||||||
|
chunk: stepper.StepMinute,
|
||||||
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||||
|
expectedSeries: []vm.TimeSeries{
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||||
|
Timestamps: []int64{1669454585000, 1669454615000},
|
||||||
|
Values: []float64{0, 0},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||||
|
Timestamps: []int64{1669454585000, 1669454615000},
|
||||||
|
Values: []float64{100, 100},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||||
|
Timestamps: []int64{1669454585000, 1669454615000},
|
||||||
|
Values: []float64{200, 200},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "step month on month time range",
|
||||||
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
|
||||||
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
||||||
|
start: "2022-09-26T11:23:05+02:00",
|
||||||
|
end: "2022-11-26T11:24:05+02:00",
|
||||||
|
numOfSamples: 2,
|
||||||
|
numOfSeries: 3,
|
||||||
|
chunk: stepper.StepMonth,
|
||||||
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||||
|
expectedSeries: []vm.TimeSeries{
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||||
|
Timestamps: []int64{1664184185000},
|
||||||
|
Values: []float64{0},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||||
|
Timestamps: []int64{1664184185000},
|
||||||
|
Values: []float64{100},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||||
|
Timestamps: []int64{1664184185000},
|
||||||
|
Values: []float64{200},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||||
|
Timestamps: []int64{1666819415000},
|
||||||
|
Values: []float64{0},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||||
|
Timestamps: []int64{1666819415000},
|
||||||
|
Values: []float64{100},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||||
|
Timestamps: []int64{1666819415000},
|
||||||
|
Values: []float64{200}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range testCases {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
remoteReadServer := remote_read_integration.NewRemoteReadServer(t)
|
||||||
|
defer remoteReadServer.Close()
|
||||||
|
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
||||||
|
defer remoteWriteServer.Close()
|
||||||
|
|
||||||
|
tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
||||||
|
|
||||||
|
rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error create remote read client: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
start, err := time.Parse(time.RFC3339, tt.start)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error parse start time: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
end, err := time.Parse(time.RFC3339, tt.end)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error parse end time: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
||||||
|
|
||||||
|
remoteReadServer.SetRemoteReadSeries(rrs)
|
||||||
|
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
||||||
|
|
||||||
|
tt.vmCfg.Addr = remoteWriteServer.URL()
|
||||||
|
|
||||||
|
importer, err := vm.NewImporter(tt.vmCfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create VM importer: %s", err)
|
||||||
|
}
|
||||||
|
defer importer.Close()
|
||||||
|
|
||||||
|
rmp := remoteReadProcessor{
|
||||||
|
src: rr,
|
||||||
|
dst: importer,
|
||||||
|
filter: remoteReadFilter{
|
||||||
|
timeStart: &start,
|
||||||
|
timeEnd: &end,
|
||||||
|
chunk: tt.chunk,
|
||||||
|
},
|
||||||
|
cc: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
err = rmp.run(ctx, true, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to run remote read processor: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSteamRemoteRead(t *testing.T) {
|
||||||
|
|
||||||
|
var testCases = []struct {
|
||||||
|
name string
|
||||||
|
remoteReadConfig remoteread.Config
|
||||||
|
vmCfg vm.Config
|
||||||
|
start string
|
||||||
|
end string
|
||||||
|
numOfSamples int64
|
||||||
|
numOfSeries int64
|
||||||
|
rrp remoteReadProcessor
|
||||||
|
chunk string
|
||||||
|
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
|
||||||
|
expectedSeries []vm.TimeSeries
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "step minute on minute time range",
|
||||||
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
||||||
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
||||||
|
start: "2022-11-26T11:23:05+02:00",
|
||||||
|
end: "2022-11-26T11:24:05+02:00",
|
||||||
|
numOfSamples: 2,
|
||||||
|
numOfSeries: 3,
|
||||||
|
chunk: stepper.StepMinute,
|
||||||
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||||
|
expectedSeries: []vm.TimeSeries{
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||||
|
Timestamps: []int64{1669454585000, 1669454615000},
|
||||||
|
Values: []float64{0, 0},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||||
|
Timestamps: []int64{1669454585000, 1669454615000},
|
||||||
|
Values: []float64{100, 100},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||||
|
Timestamps: []int64{1669454585000, 1669454615000},
|
||||||
|
Values: []float64{200, 200},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "step month on month time range",
|
||||||
|
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
|
||||||
|
vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true},
|
||||||
|
start: "2022-09-26T11:23:05+02:00",
|
||||||
|
end: "2022-11-26T11:24:05+02:00",
|
||||||
|
numOfSamples: 2,
|
||||||
|
numOfSeries: 3,
|
||||||
|
chunk: stepper.StepMonth,
|
||||||
|
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
|
||||||
|
expectedSeries: []vm.TimeSeries{
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||||
|
Timestamps: []int64{1664184185000},
|
||||||
|
Values: []float64{0},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||||
|
Timestamps: []int64{1664184185000},
|
||||||
|
Values: []float64{100},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||||
|
Timestamps: []int64{1664184185000},
|
||||||
|
Values: []float64{200},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
|
||||||
|
Timestamps: []int64{1666819415000},
|
||||||
|
Values: []float64{0},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
|
||||||
|
Timestamps: []int64{1666819415000},
|
||||||
|
Values: []float64{100},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "vm_metric_1",
|
||||||
|
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
|
||||||
|
Timestamps: []int64{1666819415000},
|
||||||
|
Values: []float64{200}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range testCases {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t)
|
||||||
|
defer remoteReadServer.Close()
|
||||||
|
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
|
||||||
|
defer remoteWriteServer.Close()
|
||||||
|
|
||||||
|
tt.remoteReadConfig.Addr = remoteReadServer.URL()
|
||||||
|
|
||||||
|
rr, err := remoteread.NewClient(tt.remoteReadConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error create remote read client: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
start, err := time.Parse(time.RFC3339, tt.start)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error parse start time: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
end, err := time.Parse(time.RFC3339, tt.end)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error parse end time: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
|
||||||
|
|
||||||
|
remoteReadServer.InitMockStorage(rrs)
|
||||||
|
remoteWriteServer.ExpectedSeries(tt.expectedSeries)
|
||||||
|
|
||||||
|
tt.vmCfg.Addr = remoteWriteServer.URL()
|
||||||
|
|
||||||
|
importer, err := vm.NewImporter(tt.vmCfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create VM importer: %s", err)
|
||||||
|
}
|
||||||
|
defer importer.Close()
|
||||||
|
|
||||||
|
rmp := remoteReadProcessor{
|
||||||
|
src: rr,
|
||||||
|
dst: importer,
|
||||||
|
filter: remoteReadFilter{
|
||||||
|
timeStart: &start,
|
||||||
|
timeEnd: &end,
|
||||||
|
chunk: tt.chunk,
|
||||||
|
},
|
||||||
|
cc: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
err = rmp.run(ctx, true, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to run remote read processor: %s", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
366
app/vmctl/testdata/servers_integration_test/remote_read_server.go
vendored
Normal file
366
app/vmctl/testdata/servers_integration_test/remote_read_server.go
vendored
Normal file
|
@ -0,0 +1,366 @@
|
||||||
|
package remote_read_integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/golang/snappy"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
"github.com/prometheus/prometheus/storage/remote"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxBytesInFrame = 1024 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
type RemoteReadServer struct {
|
||||||
|
server *httptest.Server
|
||||||
|
series []*prompb.TimeSeries
|
||||||
|
storage *MockStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the
|
||||||
|
// passed series based on the request to the read endpoint. It returns a server which should be closed after
|
||||||
|
// being used.
|
||||||
|
func NewRemoteReadServer(t *testing.T) *RemoteReadServer {
|
||||||
|
rrs := &RemoteReadServer{
|
||||||
|
series: make([]*prompb.TimeSeries, 0),
|
||||||
|
}
|
||||||
|
rrs.server = httptest.NewServer(rrs.getReadHandler(t))
|
||||||
|
return rrs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the server.
|
||||||
|
func (rrs *RemoteReadServer) Close() {
|
||||||
|
rrs.server.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rrs *RemoteReadServer) URL() string {
|
||||||
|
return rrs.server.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) {
|
||||||
|
rrs.series = append(rrs.series, series...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !validateReadHeaders(t, r) {
|
||||||
|
t.Fatalf("invalid read headers")
|
||||||
|
}
|
||||||
|
|
||||||
|
compressed, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error read body: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reqBuf, err := snappy.Decode(nil, compressed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error decode compressed data:%s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var req prompb.ReadRequest
|
||||||
|
if err := proto.Unmarshal(reqBuf, &req); err != nil {
|
||||||
|
t.Fatalf("error unmarshal read request: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &prompb.ReadResponse{
|
||||||
|
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, r := range req.Queries {
|
||||||
|
startTs := r.StartTimestampMs
|
||||||
|
endTs := r.EndTimestampMs
|
||||||
|
ts := make([]*prompb.TimeSeries, len(rrs.series))
|
||||||
|
for i, s := range rrs.series {
|
||||||
|
var samples []prompb.Sample
|
||||||
|
for _, sample := range s.Samples {
|
||||||
|
if sample.Timestamp >= startTs && sample.Timestamp < endTs {
|
||||||
|
samples = append(samples, sample)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var series prompb.TimeSeries
|
||||||
|
if len(samples) > 0 {
|
||||||
|
series.Labels = s.Labels
|
||||||
|
series.Samples = samples
|
||||||
|
}
|
||||||
|
ts[i] = &series
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.Results[i] = &prompb.QueryResult{Timeseries: ts}
|
||||||
|
data, err := proto.Marshal(resp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error marshal response: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
compressed = snappy.Encode(nil, data)
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||||
|
w.Header().Set("Content-Encoding", "snappy")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
|
||||||
|
if _, err := w.Write(compressed); err != nil {
|
||||||
|
t.Fatalf("snappy encode error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer {
|
||||||
|
rrs := &RemoteReadServer{
|
||||||
|
series: make([]*prompb.TimeSeries, 0),
|
||||||
|
}
|
||||||
|
rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t))
|
||||||
|
return rrs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) {
|
||||||
|
rrs.storage = NewMockStorage(series)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !validateStreamReadHeaders(t, r) {
|
||||||
|
t.Fatalf("invalid read headers")
|
||||||
|
}
|
||||||
|
|
||||||
|
f, ok := w.(http.Flusher)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface")
|
||||||
|
}
|
||||||
|
|
||||||
|
stream := remote.NewChunkedWriter(w, f)
|
||||||
|
|
||||||
|
data, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error read body: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
decodedData, err := snappy.Decode(nil, data)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error decode compressed data:%s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var req prompb.ReadRequest
|
||||||
|
if err := proto.Unmarshal(decodedData, &req); err != nil {
|
||||||
|
t.Fatalf("error unmarshal read request: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var chunks []prompb.Chunk
|
||||||
|
ctx := context.Background()
|
||||||
|
for idx, r := range req.Queries {
|
||||||
|
startTs := r.StartTimestampMs
|
||||||
|
endTs := r.EndTimestampMs
|
||||||
|
|
||||||
|
var matchers []*labels.Matcher
|
||||||
|
cb := func() (int64, error) { return 0, nil }
|
||||||
|
|
||||||
|
c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb)
|
||||||
|
|
||||||
|
q, err := c.ChunkQuerier(ctx, startTs, endTs)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error init chunk querier: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ss := q.Select(false, nil, matchers...)
|
||||||
|
for ss.Next() {
|
||||||
|
series := ss.At()
|
||||||
|
iter := series.Iterator()
|
||||||
|
labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil)
|
||||||
|
|
||||||
|
frameBytesLeft := maxBytesInFrame
|
||||||
|
for _, lb := range labels {
|
||||||
|
frameBytesLeft -= lb.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
isNext := iter.Next()
|
||||||
|
|
||||||
|
for isNext {
|
||||||
|
chunk := iter.At()
|
||||||
|
|
||||||
|
if chunk.Chunk == nil {
|
||||||
|
t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks = append(chunks, prompb.Chunk{
|
||||||
|
MinTimeMs: chunk.MinTime,
|
||||||
|
MaxTimeMs: chunk.MaxTime,
|
||||||
|
Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()),
|
||||||
|
Data: chunk.Chunk.Bytes(),
|
||||||
|
})
|
||||||
|
|
||||||
|
frameBytesLeft -= chunks[len(chunks)-1].Size()
|
||||||
|
|
||||||
|
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
|
||||||
|
isNext = iter.Next()
|
||||||
|
if frameBytesLeft > 0 && isNext {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &prompb.ChunkedReadResponse{
|
||||||
|
ChunkedSeries: []*prompb.ChunkedSeries{
|
||||||
|
{Labels: labels, Chunks: chunks},
|
||||||
|
},
|
||||||
|
QueryIndex: int64(idx),
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := proto.Marshal(resp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error marshal response: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := stream.Write(b); err != nil {
|
||||||
|
t.Fatalf("error write to stream: %s", err)
|
||||||
|
}
|
||||||
|
chunks = chunks[:0]
|
||||||
|
rrs.storage.Reset()
|
||||||
|
}
|
||||||
|
if err := iter.Err(); err != nil {
|
||||||
|
t.Fatalf("error iterate over chunk series: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateReadHeaders(t *testing.T, r *http.Request) bool {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
|
||||||
|
}
|
||||||
|
if r.Header.Get("Content-Encoding") != "snappy" {
|
||||||
|
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
|
||||||
|
}
|
||||||
|
if r.Header.Get("Content-Type") != "application/x-protobuf" {
|
||||||
|
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf")
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
|
||||||
|
if remoteReadVersion == "" {
|
||||||
|
t.Fatalf("got empty prometheus remote read header")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(remoteReadVersion, "0.1.") {
|
||||||
|
t.Fatalf("wrong remote version defined")
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateStreamReadHeaders(t *testing.T, r *http.Request) bool {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
|
||||||
|
}
|
||||||
|
if r.Header.Get("Content-Encoding") != "snappy" {
|
||||||
|
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
|
||||||
|
}
|
||||||
|
if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" {
|
||||||
|
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
|
||||||
|
if remoteReadVersion == "" {
|
||||||
|
t.Fatalf("got empty prometheus remote read header")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(remoteReadVersion, "0.1.") {
|
||||||
|
t.Fatalf("wrong remote version defined")
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries {
|
||||||
|
var ts []*prompb.TimeSeries
|
||||||
|
j := 0
|
||||||
|
for i := 0; i < int(numOfSeries); i++ {
|
||||||
|
if i%3 == 0 {
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
|
||||||
|
timeSeries := prompb.TimeSeries{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)},
|
||||||
|
{Name: "job", Value: strconv.Itoa(i)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ts = append(ts, &timeSeries)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range ts {
|
||||||
|
ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample {
|
||||||
|
samples := make([]prompb.Sample, 0)
|
||||||
|
delta := (endTime - startTime) / numOfSamples
|
||||||
|
|
||||||
|
t := startTime
|
||||||
|
for t != endTime {
|
||||||
|
v := 100 * int64(idx)
|
||||||
|
samples = append(samples, prompb.Sample{
|
||||||
|
Timestamp: t * 1000,
|
||||||
|
Value: float64(v),
|
||||||
|
})
|
||||||
|
t = t + delta
|
||||||
|
}
|
||||||
|
|
||||||
|
return samples
|
||||||
|
}
|
||||||
|
|
||||||
|
type MockStorage struct {
|
||||||
|
query *prompb.Query
|
||||||
|
store []*prompb.TimeSeries
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMockStorage(series []*prompb.TimeSeries) *MockStorage {
|
||||||
|
return &MockStorage{store: series}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||||
|
if ms.query != nil {
|
||||||
|
return nil, fmt.Errorf("expected only one call to remote client got: %v", query)
|
||||||
|
}
|
||||||
|
ms.query = query
|
||||||
|
|
||||||
|
q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))}
|
||||||
|
for _, s := range ms.store {
|
||||||
|
var samples []prompb.Sample
|
||||||
|
for _, sample := range s.Samples {
|
||||||
|
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs {
|
||||||
|
samples = append(samples, sample)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var series prompb.TimeSeries
|
||||||
|
if len(samples) > 0 {
|
||||||
|
series.Labels = s.Labels
|
||||||
|
series.Samples = samples
|
||||||
|
}
|
||||||
|
|
||||||
|
q.Timeseries = append(q.Timeseries, &series)
|
||||||
|
}
|
||||||
|
return q, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *MockStorage) Reset() {
|
||||||
|
ms.query = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelsToLabelsProto(labels labels.Labels) []prompb.Label {
|
||||||
|
result := make([]prompb.Label, 0, len(labels))
|
||||||
|
for _, l := range labels {
|
||||||
|
result = append(result, prompb.Label{
|
||||||
|
Name: l.Name,
|
||||||
|
Value: l.Value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
86
app/vmctl/testdata/servers_integration_test/remote_write_server.go
vendored
Normal file
86
app/vmctl/testdata/servers_integration_test/remote_write_server.go
vendored
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
package remote_read_integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
|
||||||
|
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RemoteWriteServer struct {
|
||||||
|
server *httptest.Server
|
||||||
|
series []vm.TimeSeries
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRemoteWriteServer prepares test remote write server
|
||||||
|
func NewRemoteWriteServer(t *testing.T) *RemoteWriteServer {
|
||||||
|
rws := &RemoteWriteServer{series: make([]vm.TimeSeries, 0)}
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("/api/v1/import", rws.getWriteHandler(t))
|
||||||
|
mux.Handle("/health", rws.handlePing())
|
||||||
|
rws.server = httptest.NewServer(mux)
|
||||||
|
return rws
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the server.
|
||||||
|
func (rws *RemoteWriteServer) Close() {
|
||||||
|
rws.server.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) {
|
||||||
|
rws.series = append(rws.series, series...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rws *RemoteWriteServer) URL() string {
|
||||||
|
return rws.server.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rws *RemoteWriteServer) getWriteHandler(t *testing.T) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var tss []vm.TimeSeries
|
||||||
|
scanner := bufio.NewScanner(r.Body)
|
||||||
|
var rows parser.Rows
|
||||||
|
for scanner.Scan() {
|
||||||
|
|
||||||
|
rows.Unmarshal(scanner.Text())
|
||||||
|
for _, row := range rows.Rows {
|
||||||
|
var labelPairs []vm.LabelPair
|
||||||
|
var ts vm.TimeSeries
|
||||||
|
nameValue := ""
|
||||||
|
for _, tag := range row.Tags {
|
||||||
|
if string(tag.Key) == "__name__" {
|
||||||
|
nameValue = string(tag.Value)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
labelPairs = append(labelPairs, vm.LabelPair{Name: string(tag.Key), Value: string(tag.Value)})
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.Values = append(ts.Values, row.Values...)
|
||||||
|
ts.Timestamps = append(ts.Timestamps, row.Timestamps...)
|
||||||
|
ts.Name = nameValue
|
||||||
|
ts.LabelPairs = labelPairs
|
||||||
|
tss = append(tss, ts)
|
||||||
|
}
|
||||||
|
rows.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(tss, rws.series) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.series, tss)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rws *RemoteWriteServer) handlePing() http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = w.Write([]byte("OK"))
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in a new issue