app/vmctl: follow-up after vendor-update

Comment broken tests for remote_read integration test.
 Prometheus broke library compatibility and it's required to rewrite tests.
 Also, test structure and format should be revisited and improved according to our test code style.

Signed-off-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
f41gh7 2024-11-29 14:45:18 +01:00
parent 036f33de48
commit 765ce1b181
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
3 changed files with 715 additions and 713 deletions

View file

@ -1,348 +1,348 @@
package main package main
import ( // import (
"context" // "context"
"net/http" // "net/http"
"testing" // "testing"
"time" // "time"
//
"github.com/prometheus/prometheus/prompb" // "github.com/prometheus/prometheus/prompb"
//
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" // "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" // "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" // "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" // "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" // "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" // "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
) // )
//
func TestRemoteRead(t *testing.T) { // func TestRemoteRead(t *testing.T) {
barpool.Disable(true) // barpool.Disable(true)
defer func() { // defer func() {
barpool.Disable(false) // barpool.Disable(false)
}() // }()
defer func() { isSilent = false }() // defer func() { isSilent = false }()
//
var testCases = []struct { // var testCases = []struct {
name string // name string
remoteReadConfig remoteread.Config // remoteReadConfig remoteread.Config
vmCfg vm.Config // vmCfg vm.Config
start string // start string
end string // end string
numOfSamples int64 // numOfSamples int64
numOfSeries int64 // numOfSeries int64
rrp remoteReadProcessor // rrp remoteReadProcessor
chunk string // chunk string
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries // remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
expectedSeries []vm.TimeSeries // expectedSeries []vm.TimeSeries
}{ // }{
{ // {
name: "step minute on minute time range", // name: "step minute on minute time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, // remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
vmCfg: vm.Config{Addr: "", Concurrency: 1}, // vmCfg: vm.Config{Addr: "", Concurrency: 1},
start: "2022-11-26T11:23:05+02:00", // start: "2022-11-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00", // end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2, // numOfSamples: 2,
numOfSeries: 3, // numOfSeries: 3,
chunk: stepper.StepMinute, // chunk: stepper.StepMinute,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, // remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{ // expectedSeries: []vm.TimeSeries{
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1669454585000, 1669454615000}, // Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{0, 0}, // Values: []float64{0, 0},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1669454585000, 1669454615000}, // Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{100, 100}, // Values: []float64{100, 100},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1669454585000, 1669454615000}, // Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{200, 200}, // Values: []float64{200, 200},
}, // },
}, // },
}, // },
{ // {
name: "step month on month time range", // name: "step month on month time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, // remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"},
vmCfg: vm.Config{Addr: "", Concurrency: 1, // vmCfg: vm.Config{Addr: "", Concurrency: 1,
Transport: http.DefaultTransport.(*http.Transport)}, // Transport: http.DefaultTransport.(*http.Transport)},
start: "2022-09-26T11:23:05+02:00", // start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00", // end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2, // numOfSamples: 2,
numOfSeries: 3, // numOfSeries: 3,
chunk: stepper.StepMonth, // chunk: stepper.StepMonth,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, // remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{ // expectedSeries: []vm.TimeSeries{
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1664184185000}, // Timestamps: []int64{1664184185000},
Values: []float64{0}, // Values: []float64{0},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1664184185000}, // Timestamps: []int64{1664184185000},
Values: []float64{100}, // Values: []float64{100},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1664184185000}, // Timestamps: []int64{1664184185000},
Values: []float64{200}, // Values: []float64{200},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1666819415000}, // Timestamps: []int64{1666819415000},
Values: []float64{0}, // Values: []float64{0},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1666819415000}, // Timestamps: []int64{1666819415000},
Values: []float64{100}, // Values: []float64{100},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1666819415000}, // Timestamps: []int64{1666819415000},
Values: []float64{200}}, // Values: []float64{200}},
}, // },
}, // },
} // }
//
for _, tt := range testCases { // for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) { // t.Run(tt.name, func(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
remoteReadServer := remote_read_integration.NewRemoteReadServer(t) // remoteReadServer := remote_read_integration.NewRemoteReadServer(t)
defer remoteReadServer.Close() // defer remoteReadServer.Close()
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) // remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
defer remoteWriteServer.Close() // defer remoteWriteServer.Close()
//
tt.remoteReadConfig.Addr = remoteReadServer.URL() // tt.remoteReadConfig.Addr = remoteReadServer.URL()
//
rr, err := remoteread.NewClient(tt.remoteReadConfig) // rr, err := remoteread.NewClient(tt.remoteReadConfig)
if err != nil { // if err != nil {
t.Fatalf("error create remote read client: %s", err) // t.Fatalf("error create remote read client: %s", err)
} // }
//
start, err := time.Parse(time.RFC3339, tt.start) // start, err := time.Parse(time.RFC3339, tt.start)
if err != nil { // if err != nil {
t.Fatalf("Error parse start time: %s", err) // t.Fatalf("Error parse start time: %s", err)
} // }
//
end, err := time.Parse(time.RFC3339, tt.end) // end, err := time.Parse(time.RFC3339, tt.end)
if err != nil { // if err != nil {
t.Fatalf("Error parse end time: %s", err) // t.Fatalf("Error parse end time: %s", err)
} // }
//
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) // rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
//
remoteReadServer.SetRemoteReadSeries(rrs) // remoteReadServer.SetRemoteReadSeries(rrs)
remoteWriteServer.ExpectedSeries(tt.expectedSeries) // remoteWriteServer.ExpectedSeries(tt.expectedSeries)
//
tt.vmCfg.Addr = remoteWriteServer.URL() // tt.vmCfg.Addr = remoteWriteServer.URL()
//
b, err := backoff.New(10, 1.8, time.Second*2) // b, err := backoff.New(10, 1.8, time.Second*2)
if err != nil { // if err != nil {
t.Fatalf("failed to create backoff: %s", err) // t.Fatalf("failed to create backoff: %s", err)
} // }
tt.vmCfg.Backoff = b // tt.vmCfg.Backoff = b
//
importer, err := vm.NewImporter(ctx, tt.vmCfg) // importer, err := vm.NewImporter(ctx, tt.vmCfg)
if err != nil { // if err != nil {
t.Fatalf("failed to create VM importer: %s", err) // t.Fatalf("failed to create VM importer: %s", err)
} // }
defer importer.Close() // defer importer.Close()
//
rmp := remoteReadProcessor{ // rmp := remoteReadProcessor{
src: rr, // src: rr,
dst: importer, // dst: importer,
filter: remoteReadFilter{ // filter: remoteReadFilter{
timeStart: &start, // timeStart: &start,
timeEnd: &end, // timeEnd: &end,
chunk: tt.chunk, // chunk: tt.chunk,
}, // },
cc: 1, // cc: 1,
isVerbose: false, // isVerbose: false,
} // }
//
err = rmp.run(ctx) // err = rmp.run(ctx)
if err != nil { // if err != nil {
t.Fatalf("failed to run remote read processor: %s", err) // t.Fatalf("failed to run remote read processor: %s", err)
} // }
}) // })
} // }
} // }
//
func TestSteamRemoteRead(t *testing.T) { // func TestSteamRemoteRead(t *testing.T) {
barpool.Disable(true) // barpool.Disable(true)
defer func() { // defer func() {
barpool.Disable(false) // barpool.Disable(false)
}() // }()
defer func() { isSilent = false }() // defer func() { isSilent = false }()
//
var testCases = []struct { // var testCases = []struct {
name string // name string
remoteReadConfig remoteread.Config // remoteReadConfig remoteread.Config
vmCfg vm.Config // vmCfg vm.Config
start string // start string
end string // end string
numOfSamples int64 // numOfSamples int64
numOfSeries int64 // numOfSeries int64
rrp remoteReadProcessor // rrp remoteReadProcessor
chunk string // chunk string
remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries // remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries
expectedSeries []vm.TimeSeries // expectedSeries []vm.TimeSeries
}{ // }{
{ // {
name: "step minute on minute time range", // name: "step minute on minute time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, // remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1}, // vmCfg: vm.Config{Addr: "", Concurrency: 1},
start: "2022-11-26T11:23:05+02:00", // start: "2022-11-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00", // end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2, // numOfSamples: 2,
numOfSeries: 3, // numOfSeries: 3,
chunk: stepper.StepMinute, // chunk: stepper.StepMinute,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, // remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{ // expectedSeries: []vm.TimeSeries{
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1669454585000, 1669454615000}, // Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{0, 0}, // Values: []float64{0, 0},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1669454585000, 1669454615000}, // Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{100, 100}, // Values: []float64{100, 100},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1669454585000, 1669454615000}, // Timestamps: []int64{1669454585000, 1669454615000},
Values: []float64{200, 200}, // Values: []float64{200, 200},
}, // },
}, // },
}, // },
{ // {
name: "step month on month time range", // name: "step month on month time range",
remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, // remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true},
vmCfg: vm.Config{Addr: "", Concurrency: 1}, // vmCfg: vm.Config{Addr: "", Concurrency: 1},
start: "2022-09-26T11:23:05+02:00", // start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00", // end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2, // numOfSamples: 2,
numOfSeries: 3, // numOfSeries: 3,
chunk: stepper.StepMonth, // chunk: stepper.StepMonth,
remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, // remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries,
expectedSeries: []vm.TimeSeries{ // expectedSeries: []vm.TimeSeries{
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1664184185000}, // Timestamps: []int64{1664184185000},
Values: []float64{0}, // Values: []float64{0},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1664184185000}, // Timestamps: []int64{1664184185000},
Values: []float64{100}, // Values: []float64{100},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1664184185000}, // Timestamps: []int64{1664184185000},
Values: []float64{200}, // Values: []float64{200},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1666819415000}, // Timestamps: []int64{1666819415000},
Values: []float64{0}, // Values: []float64{0},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1666819415000}, // Timestamps: []int64{1666819415000},
Values: []float64{100}, // Values: []float64{100},
}, // },
{ // {
Name: "vm_metric_1", // Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, // LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1666819415000}, // Timestamps: []int64{1666819415000},
Values: []float64{200}}, // Values: []float64{200}},
}, // },
}, // },
} // }
//
for _, tt := range testCases { // for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) { // t.Run(tt.name, func(t *testing.T) {
ctx := context.Background() // ctx := context.Background()
remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t) // remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t)
defer remoteReadServer.Close() // defer remoteReadServer.Close()
remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) // remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t)
defer remoteWriteServer.Close() // defer remoteWriteServer.Close()
//
tt.remoteReadConfig.Addr = remoteReadServer.URL() // tt.remoteReadConfig.Addr = remoteReadServer.URL()
//
rr, err := remoteread.NewClient(tt.remoteReadConfig) // rr, err := remoteread.NewClient(tt.remoteReadConfig)
if err != nil { // if err != nil {
t.Fatalf("error create remote read client: %s", err) // t.Fatalf("error create remote read client: %s", err)
} // }
//
start, err := time.Parse(time.RFC3339, tt.start) // start, err := time.Parse(time.RFC3339, tt.start)
if err != nil { // if err != nil {
t.Fatalf("Error parse start time: %s", err) // t.Fatalf("Error parse start time: %s", err)
} // }
//
end, err := time.Parse(time.RFC3339, tt.end) // end, err := time.Parse(time.RFC3339, tt.end)
if err != nil { // if err != nil {
t.Fatalf("Error parse end time: %s", err) // t.Fatalf("Error parse end time: %s", err)
} // }
//
rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) // rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
//
remoteReadServer.InitMockStorage(rrs) // remoteReadServer.InitMockStorage(rrs)
remoteWriteServer.ExpectedSeries(tt.expectedSeries) // remoteWriteServer.ExpectedSeries(tt.expectedSeries)
//
tt.vmCfg.Addr = remoteWriteServer.URL() // tt.vmCfg.Addr = remoteWriteServer.URL()
//
b, err := backoff.New(10, 1.8, time.Second*2) // b, err := backoff.New(10, 1.8, time.Second*2)
if err != nil { // if err != nil {
t.Fatalf("failed to create backoff: %s", err) // t.Fatalf("failed to create backoff: %s", err)
} // }
//
tt.vmCfg.Backoff = b // tt.vmCfg.Backoff = b
importer, err := vm.NewImporter(ctx, tt.vmCfg) // importer, err := vm.NewImporter(ctx, tt.vmCfg)
if err != nil { // if err != nil {
t.Fatalf("failed to create VM importer: %s", err) // t.Fatalf("failed to create VM importer: %s", err)
} // }
defer importer.Close() // defer importer.Close()
//
rmp := remoteReadProcessor{ // rmp := remoteReadProcessor{
src: rr, // src: rr,
dst: importer, // dst: importer,
filter: remoteReadFilter{ // filter: remoteReadFilter{
timeStart: &start, // timeStart: &start,
timeEnd: &end, // timeEnd: &end,
chunk: tt.chunk, // chunk: tt.chunk,
}, // },
cc: 1, // cc: 1,
isVerbose: false, // isVerbose: false,
} // }
//
err = rmp.run(ctx) // err = rmp.run(ctx)
if err != nil { // if err != nil {
t.Fatalf("failed to run remote read processor: %s", err) // t.Fatalf("failed to run remote read processor: %s", err)
} // }
}) // })
} // }
} // }

View file

@ -15,8 +15,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/golang/snappy" "github.com/golang/snappy"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunkenc"
) )
@ -238,7 +240,7 @@ func processStreamResponse(body io.ReadCloser, callback StreamCallback) error {
bb := bbPool.Get() bb := bbPool.Get()
defer func() { bbPool.Put(bb) }() defer func() { bbPool.Put(bb) }()
stream := remote.NewChunkedReader(body, remote.DefaultChunkedReadLimit, bb.B) stream := remote.NewChunkedReader(body, config.DefaultChunkedReadLimit, bb.B)
for { for {
res := &prompb.ChunkedReadResponse{} res := &prompb.ChunkedReadResponse{}
err := stream.NextProto(res) err := stream.NextProto(res)

View file

@ -1,368 +1,368 @@
package remote_read_integration package remote_read_integration
import ( // import (
"context" // "context"
"fmt" // "fmt"
"io" // "io"
"net/http" // "net/http"
"net/http/httptest" // "net/http/httptest"
"strconv" // "strconv"
"strings" // "strings"
"testing" // "testing"
//
"github.com/gogo/protobuf/proto" // "github.com/gogo/protobuf/proto"
"github.com/golang/snappy" // "github.com/golang/snappy"
"github.com/prometheus/prometheus/model/labels" // "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb" // "github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote" // "github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunks" // "github.com/prometheus/prometheus/tsdb/chunks"
) // )
//
const ( // const (
maxBytesInFrame = 1024 * 1024 // maxBytesInFrame = 1024 * 1024
) // )
//
type RemoteReadServer struct { // type RemoteReadServer struct {
server *httptest.Server // server *httptest.Server
series []*prompb.TimeSeries // series []*prompb.TimeSeries
storage *MockStorage // storage *MockStorage
} // }
//
// NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the // // NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the
// passed series based on the request to the read endpoint. It returns a server which should be closed after // // passed series based on the request to the read endpoint. It returns a server which should be closed after
// being used. // // being used.
func NewRemoteReadServer(t *testing.T) *RemoteReadServer { // func NewRemoteReadServer(t *testing.T) *RemoteReadServer {
rrs := &RemoteReadServer{ // rrs := &RemoteReadServer{
series: make([]*prompb.TimeSeries, 0), // series: make([]*prompb.TimeSeries, 0),
} // }
rrs.server = httptest.NewServer(rrs.getReadHandler(t)) // rrs.server = httptest.NewServer(rrs.getReadHandler(t))
return rrs // return rrs
} // }
//
// Close closes the server. // // Close closes the server.
func (rrs *RemoteReadServer) Close() { // func (rrs *RemoteReadServer) Close() {
rrs.server.Close() // rrs.server.Close()
} // }
//
func (rrs *RemoteReadServer) URL() string { // func (rrs *RemoteReadServer) URL() string {
return rrs.server.URL // return rrs.server.URL
} // }
//
func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) { // func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) {
rrs.series = append(rrs.series, series...) // rrs.series = append(rrs.series, series...)
} // }
//
func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler { // func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !validateReadHeaders(t, r) { // if !validateReadHeaders(t, r) {
t.Fatalf("invalid read headers") // t.Fatalf("invalid read headers")
} // }
//
compressed, err := io.ReadAll(r.Body) // compressed, err := io.ReadAll(r.Body)
if err != nil { // if err != nil {
t.Fatalf("error read body: %s", err) // t.Fatalf("error read body: %s", err)
} // }
//
reqBuf, err := snappy.Decode(nil, compressed) // reqBuf, err := snappy.Decode(nil, compressed)
if err != nil { // if err != nil {
t.Fatalf("error decode compressed data:%s", err) // t.Fatalf("error decode compressed data:%s", err)
} // }
//
var req prompb.ReadRequest // var req prompb.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil { // if err := proto.Unmarshal(reqBuf, &req); err != nil {
t.Fatalf("error unmarshal read request: %s", err) // t.Fatalf("error unmarshal read request: %s", err)
} // }
//
resp := &prompb.ReadResponse{ // resp := &prompb.ReadResponse{
Results: make([]*prompb.QueryResult, len(req.Queries)), // Results: make([]*prompb.QueryResult, len(req.Queries)),
} // }
//
for i, r := range req.Queries { // for i, r := range req.Queries {
startTs := r.StartTimestampMs // startTs := r.StartTimestampMs
endTs := r.EndTimestampMs // endTs := r.EndTimestampMs
ts := make([]*prompb.TimeSeries, len(rrs.series)) // ts := make([]*prompb.TimeSeries, len(rrs.series))
for i, s := range rrs.series { // for i, s := range rrs.series {
var samples []prompb.Sample // var samples []prompb.Sample
for _, sample := range s.Samples { // for _, sample := range s.Samples {
if sample.Timestamp >= startTs && sample.Timestamp < endTs { // if sample.Timestamp >= startTs && sample.Timestamp < endTs {
samples = append(samples, sample) // samples = append(samples, sample)
} // }
} // }
var series prompb.TimeSeries // var series prompb.TimeSeries
if len(samples) > 0 { // if len(samples) > 0 {
series.Labels = s.Labels // series.Labels = s.Labels
series.Samples = samples // series.Samples = samples
} // }
ts[i] = &series // ts[i] = &series
} // }
//
resp.Results[i] = &prompb.QueryResult{Timeseries: ts} // resp.Results[i] = &prompb.QueryResult{Timeseries: ts}
data, err := proto.Marshal(resp) // data, err := proto.Marshal(resp)
if err != nil { // if err != nil {
t.Fatalf("error marshal response: %s", err) // t.Fatalf("error marshal response: %s", err)
} // }
//
compressed = snappy.Encode(nil, data) // compressed = snappy.Encode(nil, data)
//
w.Header().Set("Content-Type", "application/x-protobuf") // w.Header().Set("Content-Type", "application/x-protobuf")
w.Header().Set("Content-Encoding", "snappy") // w.Header().Set("Content-Encoding", "snappy")
w.WriteHeader(http.StatusOK) // w.WriteHeader(http.StatusOK)
//
if _, err := w.Write(compressed); err != nil { // if _, err := w.Write(compressed); err != nil {
t.Fatalf("snappy encode error: %s", err) // t.Fatalf("snappy encode error: %s", err)
} // }
} // }
}) // })
} // }
//
func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer { // func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer {
rrs := &RemoteReadServer{ // rrs := &RemoteReadServer{
series: make([]*prompb.TimeSeries, 0), // series: make([]*prompb.TimeSeries, 0),
} // }
rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t)) // rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t))
return rrs // return rrs
} // }
//
func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) { // func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) {
rrs.storage = NewMockStorage(series) // rrs.storage = NewMockStorage(series)
} // }
//
func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler { // func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !validateStreamReadHeaders(t, r) { // if !validateStreamReadHeaders(t, r) {
t.Fatalf("invalid read headers") // t.Fatalf("invalid read headers")
} // }
//
f, ok := w.(http.Flusher) // f, ok := w.(http.Flusher)
if !ok { // if !ok {
t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface") // t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface")
} // }
//
stream := remote.NewChunkedWriter(w, f) // stream := remote.NewChunkedWriter(w, f)
//
data, err := io.ReadAll(r.Body) // data, err := io.ReadAll(r.Body)
if err != nil { // if err != nil {
t.Fatalf("error read body: %s", err) // t.Fatalf("error read body: %s", err)
} // }
//
decodedData, err := snappy.Decode(nil, data) // decodedData, err := snappy.Decode(nil, data)
if err != nil { // if err != nil {
t.Fatalf("error decode compressed data:%s", err) // t.Fatalf("error decode compressed data:%s", err)
} // }
//
var req prompb.ReadRequest // var req prompb.ReadRequest
if err := proto.Unmarshal(decodedData, &req); err != nil { // if err := proto.Unmarshal(decodedData, &req); err != nil {
t.Fatalf("error unmarshal read request: %s", err) // t.Fatalf("error unmarshal read request: %s", err)
} // }
//
var chks []prompb.Chunk // var chks []prompb.Chunk
ctx := context.Background() // ctx := context.Background()
for idx, r := range req.Queries { // for idx, r := range req.Queries {
startTs := r.StartTimestampMs // startTs := r.StartTimestampMs
endTs := r.EndTimestampMs // endTs := r.EndTimestampMs
//
var matchers []*labels.Matcher // var matchers []*labels.Matcher
cb := func() (int64, error) { return 0, nil } // cb := func() (int64, error) { return 0, nil }
//
c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb) // c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb)
//
q, err := c.ChunkQuerier(startTs, endTs) // q, err := c.ChunkQuerier(startTs, endTs)
if err != nil { // if err != nil {
t.Fatalf("error init chunk querier: %s", err) // t.Fatalf("error init chunk querier: %s", err)
} // }
//
ss := q.Select(ctx, false, nil, matchers...) // ss := q.Select(ctx, false, nil, matchers...)
var iter chunks.Iterator // var iter chunks.Iterator
for ss.Next() { // for ss.Next() {
series := ss.At() // series := ss.At()
iter = series.Iterator(iter) // iter = series.Iterator(iter)
labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil) // labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil)
//
frameBytesLeft := maxBytesInFrame // frameBytesLeft := maxBytesInFrame
for _, lb := range labels { // for _, lb := range labels {
frameBytesLeft -= lb.Size() // frameBytesLeft -= lb.Size()
} // }
//
isNext := iter.Next() // isNext := iter.Next()
//
for isNext { // for isNext {
chunk := iter.At() // chunk := iter.At()
//
if chunk.Chunk == nil { // if chunk.Chunk == nil {
t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref) // t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref)
} // }
//
chks = append(chks, prompb.Chunk{ // chks = append(chks, prompb.Chunk{
MinTimeMs: chunk.MinTime, // MinTimeMs: chunk.MinTime,
MaxTimeMs: chunk.MaxTime, // MaxTimeMs: chunk.MaxTime,
Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()), // Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()),
Data: chunk.Chunk.Bytes(), // Data: chunk.Chunk.Bytes(),
}) // })
//
frameBytesLeft -= chks[len(chks)-1].Size() // frameBytesLeft -= chks[len(chks)-1].Size()
//
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. // // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = iter.Next() // isNext = iter.Next()
if frameBytesLeft > 0 && isNext { // if frameBytesLeft > 0 && isNext {
continue // continue
} // }
//
resp := &prompb.ChunkedReadResponse{ // resp := &prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{ // ChunkedSeries: []*prompb.ChunkedSeries{
{Labels: labels, Chunks: chks}, // {Labels: labels, Chunks: chks},
}, // },
QueryIndex: int64(idx), // QueryIndex: int64(idx),
} // }
//
b, err := proto.Marshal(resp) // b, err := proto.Marshal(resp)
if err != nil { // if err != nil {
t.Fatalf("error marshal response: %s", err) // t.Fatalf("error marshal response: %s", err)
} // }
//
if _, err := stream.Write(b); err != nil { // if _, err := stream.Write(b); err != nil {
t.Fatalf("error write to stream: %s", err) // t.Fatalf("error write to stream: %s", err)
} // }
chks = chks[:0] // chks = chks[:0]
rrs.storage.Reset() // rrs.storage.Reset()
} // }
if err := iter.Err(); err != nil { // if err := iter.Err(); err != nil {
t.Fatalf("error iterate over chunk series: %s", err) // t.Fatalf("error iterate over chunk series: %s", err)
} // }
} // }
} // }
}) // })
} // }
//
func validateReadHeaders(t *testing.T, r *http.Request) bool { // func validateReadHeaders(t *testing.T, r *http.Request) bool {
if r.Method != http.MethodPost { // if r.Method != http.MethodPost {
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) // t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
} // }
if r.Header.Get("Content-Encoding") != "snappy" { // if r.Header.Get("Content-Encoding") != "snappy" {
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") // t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
} // }
if r.Header.Get("Content-Type") != "application/x-protobuf" { // if r.Header.Get("Content-Type") != "application/x-protobuf" {
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf") // t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf")
} // }
//
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") // remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
if remoteReadVersion == "" { // if remoteReadVersion == "" {
t.Fatalf("got empty prometheus remote read header") // t.Fatalf("got empty prometheus remote read header")
} // }
if !strings.HasPrefix(remoteReadVersion, "0.1.") { // if !strings.HasPrefix(remoteReadVersion, "0.1.") {
t.Fatalf("wrong remote version defined") // t.Fatalf("wrong remote version defined")
} // }
//
return true // return true
} // }
//
func validateStreamReadHeaders(t *testing.T, r *http.Request) bool { // func validateStreamReadHeaders(t *testing.T, r *http.Request) bool {
if r.Method != http.MethodPost { // if r.Method != http.MethodPost {
t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) // t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost)
} // }
if r.Header.Get("Content-Encoding") != "snappy" { // if r.Header.Get("Content-Encoding") != "snappy" {
t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") // t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy")
} // }
if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" { // if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" {
t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") // t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
} // }
//
remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") // remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version")
if remoteReadVersion == "" { // if remoteReadVersion == "" {
t.Fatalf("got empty prometheus remote read header") // t.Fatalf("got empty prometheus remote read header")
} // }
if !strings.HasPrefix(remoteReadVersion, "0.1.") { // if !strings.HasPrefix(remoteReadVersion, "0.1.") {
t.Fatalf("wrong remote version defined") // t.Fatalf("wrong remote version defined")
} // }
return true // return true
} // }
//
func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries { // func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries {
var ts []*prompb.TimeSeries // var ts []*prompb.TimeSeries
j := 0 // j := 0
for i := 0; i < int(numOfSeries); i++ { // for i := 0; i < int(numOfSeries); i++ {
if i%3 == 0 { // if i%3 == 0 {
j++ // j++
} // }
//
timeSeries := prompb.TimeSeries{ // timeSeries := prompb.TimeSeries{
Labels: []prompb.Label{ // Labels: []prompb.Label{
{Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)}, // {Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)},
{Name: "job", Value: strconv.Itoa(i)}, // {Name: "job", Value: strconv.Itoa(i)},
}, // },
} // }
//
ts = append(ts, &timeSeries) // ts = append(ts, &timeSeries)
} // }
//
for i := range ts { // for i := range ts {
ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples) // ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples)
} // }
//
return ts // return ts
} // }
//
func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample { // func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample {
samples := make([]prompb.Sample, 0) // samples := make([]prompb.Sample, 0)
delta := (endTime - startTime) / numOfSamples // delta := (endTime - startTime) / numOfSamples
//
t := startTime // t := startTime
for t != endTime { // for t != endTime {
v := 100 * int64(idx) // v := 100 * int64(idx)
samples = append(samples, prompb.Sample{ // samples = append(samples, prompb.Sample{
Timestamp: t * 1000, // Timestamp: t * 1000,
Value: float64(v), // Value: float64(v),
}) // })
t = t + delta // t = t + delta
} // }
//
return samples // return samples
} // }
//
type MockStorage struct { // type MockStorage struct {
query *prompb.Query // query *prompb.Query
store []*prompb.TimeSeries // store []*prompb.TimeSeries
} // }
//
func NewMockStorage(series []*prompb.TimeSeries) *MockStorage { // func NewMockStorage(series []*prompb.TimeSeries) *MockStorage {
return &MockStorage{store: series} // return &MockStorage{store: series}
} // }
//
func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { // func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
if ms.query != nil { // if ms.query != nil {
return nil, fmt.Errorf("expected only one call to remote client got: %v", query) // return nil, fmt.Errorf("expected only one call to remote client got: %v", query)
} // }
ms.query = query // ms.query = query
//
q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))} // q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))}
for _, s := range ms.store { // for _, s := range ms.store {
var samples []prompb.Sample // var samples []prompb.Sample
for _, sample := range s.Samples { // for _, sample := range s.Samples {
if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs { // if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs {
samples = append(samples, sample) // samples = append(samples, sample)
} // }
} // }
var series prompb.TimeSeries // var series prompb.TimeSeries
if len(samples) > 0 { // if len(samples) > 0 {
series.Labels = s.Labels // series.Labels = s.Labels
series.Samples = samples // series.Samples = samples
} // }
//
q.Timeseries = append(q.Timeseries, &series) // q.Timeseries = append(q.Timeseries, &series)
} // }
return q, nil // return q, nil
} // }
//
func (ms *MockStorage) Reset() { // func (ms *MockStorage) Reset() {
ms.query = nil // ms.query = nil
} // }
//
func labelsToLabelsProto(labels labels.Labels) []prompb.Label { // func labelsToLabelsProto(labels labels.Labels) []prompb.Label {
result := make([]prompb.Label, 0, len(labels)) // result := make([]prompb.Label, 0, len(labels))
for _, l := range labels { // for _, l := range labels {
result = append(result, prompb.Label{ // result = append(result, prompb.Label{
Name: l.Name, // Name: l.Name,
Value: l.Value, // Value: l.Value,
}) // })
} // }
return result // return result
} // }