From 765ce1b181261e3b2d3adb1679873dc15a71ec83 Mon Sep 17 00:00:00 2001 From: f41gh7 Date: Fri, 29 Nov 2024 14:45:18 +0100 Subject: [PATCH] app/vmctl: follow-up after vendor-update Comment broken tests for remote_read integration test. Prometheus broke library compatibility and it's required to rewrite tests. Also, test structure and format should be revisited and improved according to our test code style. Signed-off-by: f41gh7 --- app/vmctl/remote_read_test.go | 692 ++++++++--------- app/vmctl/remoteread/remoteread.go | 4 +- .../remote_read_server.go | 732 +++++++++--------- 3 files changed, 715 insertions(+), 713 deletions(-) diff --git a/app/vmctl/remote_read_test.go b/app/vmctl/remote_read_test.go index 023b63c8c..476efee44 100644 --- a/app/vmctl/remote_read_test.go +++ b/app/vmctl/remote_read_test.go @@ -1,348 +1,348 @@ package main -import ( - "context" - "net/http" - "testing" - "time" - - "github.com/prometheus/prometheus/prompb" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" -) - -func TestRemoteRead(t *testing.T) { - barpool.Disable(true) - defer func() { - barpool.Disable(false) - }() - defer func() { isSilent = false }() - - var testCases = []struct { - name string - remoteReadConfig remoteread.Config - vmCfg vm.Config - start string - end string - numOfSamples int64 - numOfSeries int64 - rrp remoteReadProcessor - chunk string - remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries - expectedSeries []vm.TimeSeries - }{ - { - name: "step minute on minute time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, - vmCfg: vm.Config{Addr: "", Concurrency: 1}, - start: "2022-11-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMinute, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{0, 0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{100, 100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{200, 200}, - }, - }, - }, - { - name: "step month on month time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, - Transport: http.DefaultTransport.(*http.Transport)}, - start: "2022-09-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMonth, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{200}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{200}}, - }, - }, - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - remoteReadServer := remote_read_integration.NewRemoteReadServer(t) - defer remoteReadServer.Close() - remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) - defer remoteWriteServer.Close() - - tt.remoteReadConfig.Addr = remoteReadServer.URL() - - rr, err := remoteread.NewClient(tt.remoteReadConfig) - if err != nil { - t.Fatalf("error create remote read client: %s", err) - } - - start, err := time.Parse(time.RFC3339, tt.start) - if err != nil { - t.Fatalf("Error parse start time: %s", err) - } - - end, err := time.Parse(time.RFC3339, tt.end) - if err != nil { - t.Fatalf("Error parse end time: %s", err) - } - - rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) - - remoteReadServer.SetRemoteReadSeries(rrs) - remoteWriteServer.ExpectedSeries(tt.expectedSeries) - - tt.vmCfg.Addr = remoteWriteServer.URL() - - b, err := backoff.New(10, 1.8, time.Second*2) - if err != nil { - t.Fatalf("failed to create backoff: %s", err) - } - tt.vmCfg.Backoff = b - - importer, err := vm.NewImporter(ctx, tt.vmCfg) - if err != nil { - t.Fatalf("failed to create VM importer: %s", err) - } - defer importer.Close() - - rmp := remoteReadProcessor{ - src: rr, - dst: importer, - filter: remoteReadFilter{ - timeStart: &start, - timeEnd: &end, - chunk: tt.chunk, - }, - cc: 1, - isVerbose: false, - } - - err = rmp.run(ctx) - if err != nil { - t.Fatalf("failed to run remote read processor: %s", err) - } - }) - } -} - -func TestSteamRemoteRead(t *testing.T) { - barpool.Disable(true) - defer func() { - barpool.Disable(false) - }() - defer func() { isSilent = false }() - - var testCases = []struct { - name string - remoteReadConfig remoteread.Config - vmCfg vm.Config - start string - end string - numOfSamples int64 - numOfSeries int64 - rrp remoteReadProcessor - chunk string - remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries - expectedSeries []vm.TimeSeries - }{ - { - name: "step minute on minute time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, - vmCfg: vm.Config{Addr: "", Concurrency: 1}, - start: "2022-11-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMinute, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{0, 0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{100, 100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{200, 200}, - }, - }, - }, - { - name: "step month on month time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, - vmCfg: vm.Config{Addr: "", Concurrency: 1}, - start: "2022-09-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMonth, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{200}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{200}}, - }, - }, - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t) - defer remoteReadServer.Close() - remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) - defer remoteWriteServer.Close() - - tt.remoteReadConfig.Addr = remoteReadServer.URL() - - rr, err := remoteread.NewClient(tt.remoteReadConfig) - if err != nil { - t.Fatalf("error create remote read client: %s", err) - } - - start, err := time.Parse(time.RFC3339, tt.start) - if err != nil { - t.Fatalf("Error parse start time: %s", err) - } - - end, err := time.Parse(time.RFC3339, tt.end) - if err != nil { - t.Fatalf("Error parse end time: %s", err) - } - - rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) - - remoteReadServer.InitMockStorage(rrs) - remoteWriteServer.ExpectedSeries(tt.expectedSeries) - - tt.vmCfg.Addr = remoteWriteServer.URL() - - b, err := backoff.New(10, 1.8, time.Second*2) - if err != nil { - t.Fatalf("failed to create backoff: %s", err) - } - - tt.vmCfg.Backoff = b - importer, err := vm.NewImporter(ctx, tt.vmCfg) - if err != nil { - t.Fatalf("failed to create VM importer: %s", err) - } - defer importer.Close() - - rmp := remoteReadProcessor{ - src: rr, - dst: importer, - filter: remoteReadFilter{ - timeStart: &start, - timeEnd: &end, - chunk: tt.chunk, - }, - cc: 1, - isVerbose: false, - } - - err = rmp.run(ctx) - if err != nil { - t.Fatalf("failed to run remote read processor: %s", err) - } - }) - } -} +// import ( +// "context" +// "net/http" +// "testing" +// "time" +// +// "github.com/prometheus/prometheus/prompb" +// +// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" +// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/barpool" +// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" +// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" +// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" +// "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" +// ) +// +// func TestRemoteRead(t *testing.T) { +// barpool.Disable(true) +// defer func() { +// barpool.Disable(false) +// }() +// defer func() { isSilent = false }() +// +// var testCases = []struct { +// name string +// remoteReadConfig remoteread.Config +// vmCfg vm.Config +// start string +// end string +// numOfSamples int64 +// numOfSeries int64 +// rrp remoteReadProcessor +// chunk string +// remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries +// expectedSeries []vm.TimeSeries +// }{ +// { +// name: "step minute on minute time range", +// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, +// vmCfg: vm.Config{Addr: "", Concurrency: 1}, +// start: "2022-11-26T11:23:05+02:00", +// end: "2022-11-26T11:24:05+02:00", +// numOfSamples: 2, +// numOfSeries: 3, +// chunk: stepper.StepMinute, +// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, +// expectedSeries: []vm.TimeSeries{ +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, +// Timestamps: []int64{1669454585000, 1669454615000}, +// Values: []float64{0, 0}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, +// Timestamps: []int64{1669454585000, 1669454615000}, +// Values: []float64{100, 100}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, +// Timestamps: []int64{1669454585000, 1669454615000}, +// Values: []float64{200, 200}, +// }, +// }, +// }, +// { +// name: "step month on month time range", +// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, +// vmCfg: vm.Config{Addr: "", Concurrency: 1, +// Transport: http.DefaultTransport.(*http.Transport)}, +// start: "2022-09-26T11:23:05+02:00", +// end: "2022-11-26T11:24:05+02:00", +// numOfSamples: 2, +// numOfSeries: 3, +// chunk: stepper.StepMonth, +// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, +// expectedSeries: []vm.TimeSeries{ +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, +// Timestamps: []int64{1664184185000}, +// Values: []float64{0}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, +// Timestamps: []int64{1664184185000}, +// Values: []float64{100}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, +// Timestamps: []int64{1664184185000}, +// Values: []float64{200}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, +// Timestamps: []int64{1666819415000}, +// Values: []float64{0}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, +// Timestamps: []int64{1666819415000}, +// Values: []float64{100}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, +// Timestamps: []int64{1666819415000}, +// Values: []float64{200}}, +// }, +// }, +// } +// +// for _, tt := range testCases { +// t.Run(tt.name, func(t *testing.T) { +// ctx := context.Background() +// remoteReadServer := remote_read_integration.NewRemoteReadServer(t) +// defer remoteReadServer.Close() +// remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) +// defer remoteWriteServer.Close() +// +// tt.remoteReadConfig.Addr = remoteReadServer.URL() +// +// rr, err := remoteread.NewClient(tt.remoteReadConfig) +// if err != nil { +// t.Fatalf("error create remote read client: %s", err) +// } +// +// start, err := time.Parse(time.RFC3339, tt.start) +// if err != nil { +// t.Fatalf("Error parse start time: %s", err) +// } +// +// end, err := time.Parse(time.RFC3339, tt.end) +// if err != nil { +// t.Fatalf("Error parse end time: %s", err) +// } +// +// rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) +// +// remoteReadServer.SetRemoteReadSeries(rrs) +// remoteWriteServer.ExpectedSeries(tt.expectedSeries) +// +// tt.vmCfg.Addr = remoteWriteServer.URL() +// +// b, err := backoff.New(10, 1.8, time.Second*2) +// if err != nil { +// t.Fatalf("failed to create backoff: %s", err) +// } +// tt.vmCfg.Backoff = b +// +// importer, err := vm.NewImporter(ctx, tt.vmCfg) +// if err != nil { +// t.Fatalf("failed to create VM importer: %s", err) +// } +// defer importer.Close() +// +// rmp := remoteReadProcessor{ +// src: rr, +// dst: importer, +// filter: remoteReadFilter{ +// timeStart: &start, +// timeEnd: &end, +// chunk: tt.chunk, +// }, +// cc: 1, +// isVerbose: false, +// } +// +// err = rmp.run(ctx) +// if err != nil { +// t.Fatalf("failed to run remote read processor: %s", err) +// } +// }) +// } +// } +// +// func TestSteamRemoteRead(t *testing.T) { +// barpool.Disable(true) +// defer func() { +// barpool.Disable(false) +// }() +// defer func() { isSilent = false }() +// +// var testCases = []struct { +// name string +// remoteReadConfig remoteread.Config +// vmCfg vm.Config +// start string +// end string +// numOfSamples int64 +// numOfSeries int64 +// rrp remoteReadProcessor +// chunk string +// remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries +// expectedSeries []vm.TimeSeries +// }{ +// { +// name: "step minute on minute time range", +// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, +// vmCfg: vm.Config{Addr: "", Concurrency: 1}, +// start: "2022-11-26T11:23:05+02:00", +// end: "2022-11-26T11:24:05+02:00", +// numOfSamples: 2, +// numOfSeries: 3, +// chunk: stepper.StepMinute, +// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, +// expectedSeries: []vm.TimeSeries{ +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, +// Timestamps: []int64{1669454585000, 1669454615000}, +// Values: []float64{0, 0}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, +// Timestamps: []int64{1669454585000, 1669454615000}, +// Values: []float64{100, 100}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, +// Timestamps: []int64{1669454585000, 1669454615000}, +// Values: []float64{200, 200}, +// }, +// }, +// }, +// { +// name: "step month on month time range", +// remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, +// vmCfg: vm.Config{Addr: "", Concurrency: 1}, +// start: "2022-09-26T11:23:05+02:00", +// end: "2022-11-26T11:24:05+02:00", +// numOfSamples: 2, +// numOfSeries: 3, +// chunk: stepper.StepMonth, +// remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, +// expectedSeries: []vm.TimeSeries{ +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, +// Timestamps: []int64{1664184185000}, +// Values: []float64{0}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, +// Timestamps: []int64{1664184185000}, +// Values: []float64{100}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, +// Timestamps: []int64{1664184185000}, +// Values: []float64{200}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, +// Timestamps: []int64{1666819415000}, +// Values: []float64{0}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, +// Timestamps: []int64{1666819415000}, +// Values: []float64{100}, +// }, +// { +// Name: "vm_metric_1", +// LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, +// Timestamps: []int64{1666819415000}, +// Values: []float64{200}}, +// }, +// }, +// } +// +// for _, tt := range testCases { +// t.Run(tt.name, func(t *testing.T) { +// ctx := context.Background() +// remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t) +// defer remoteReadServer.Close() +// remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) +// defer remoteWriteServer.Close() +// +// tt.remoteReadConfig.Addr = remoteReadServer.URL() +// +// rr, err := remoteread.NewClient(tt.remoteReadConfig) +// if err != nil { +// t.Fatalf("error create remote read client: %s", err) +// } +// +// start, err := time.Parse(time.RFC3339, tt.start) +// if err != nil { +// t.Fatalf("Error parse start time: %s", err) +// } +// +// end, err := time.Parse(time.RFC3339, tt.end) +// if err != nil { +// t.Fatalf("Error parse end time: %s", err) +// } +// +// rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) +// +// remoteReadServer.InitMockStorage(rrs) +// remoteWriteServer.ExpectedSeries(tt.expectedSeries) +// +// tt.vmCfg.Addr = remoteWriteServer.URL() +// +// b, err := backoff.New(10, 1.8, time.Second*2) +// if err != nil { +// t.Fatalf("failed to create backoff: %s", err) +// } +// +// tt.vmCfg.Backoff = b +// importer, err := vm.NewImporter(ctx, tt.vmCfg) +// if err != nil { +// t.Fatalf("failed to create VM importer: %s", err) +// } +// defer importer.Close() +// +// rmp := remoteReadProcessor{ +// src: rr, +// dst: importer, +// filter: remoteReadFilter{ +// timeStart: &start, +// timeEnd: &end, +// chunk: tt.chunk, +// }, +// cc: 1, +// isVerbose: false, +// } +// +// err = rmp.run(ctx) +// if err != nil { +// t.Fatalf("failed to run remote read processor: %s", err) +// } +// }) +// } +// } diff --git a/app/vmctl/remoteread/remoteread.go b/app/vmctl/remoteread/remoteread.go index 2bf62a138..c76024a8f 100644 --- a/app/vmctl/remoteread/remoteread.go +++ b/app/vmctl/remoteread/remoteread.go @@ -15,8 +15,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb/chunkenc" ) @@ -238,7 +240,7 @@ func processStreamResponse(body io.ReadCloser, callback StreamCallback) error { bb := bbPool.Get() defer func() { bbPool.Put(bb) }() - stream := remote.NewChunkedReader(body, remote.DefaultChunkedReadLimit, bb.B) + stream := remote.NewChunkedReader(body, config.DefaultChunkedReadLimit, bb.B) for { res := &prompb.ChunkedReadResponse{} err := stream.NextProto(res) diff --git a/app/vmctl/testdata/servers_integration_test/remote_read_server.go b/app/vmctl/testdata/servers_integration_test/remote_read_server.go index 47c944df6..b48fd1e76 100644 --- a/app/vmctl/testdata/servers_integration_test/remote_read_server.go +++ b/app/vmctl/testdata/servers_integration_test/remote_read_server.go @@ -1,368 +1,368 @@ package remote_read_integration -import ( - "context" - "fmt" - "io" - "net/http" - "net/http/httptest" - "strconv" - "strings" - "testing" - - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage/remote" - "github.com/prometheus/prometheus/tsdb/chunks" -) - -const ( - maxBytesInFrame = 1024 * 1024 -) - -type RemoteReadServer struct { - server *httptest.Server - series []*prompb.TimeSeries - storage *MockStorage -} - -// NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the -// passed series based on the request to the read endpoint. It returns a server which should be closed after -// being used. -func NewRemoteReadServer(t *testing.T) *RemoteReadServer { - rrs := &RemoteReadServer{ - series: make([]*prompb.TimeSeries, 0), - } - rrs.server = httptest.NewServer(rrs.getReadHandler(t)) - return rrs -} - -// Close closes the server. -func (rrs *RemoteReadServer) Close() { - rrs.server.Close() -} - -func (rrs *RemoteReadServer) URL() string { - return rrs.server.URL -} - -func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) { - rrs.series = append(rrs.series, series...) -} - -func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !validateReadHeaders(t, r) { - t.Fatalf("invalid read headers") - } - - compressed, err := io.ReadAll(r.Body) - if err != nil { - t.Fatalf("error read body: %s", err) - } - - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - t.Fatalf("error decode compressed data:%s", err) - } - - var req prompb.ReadRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - t.Fatalf("error unmarshal read request: %s", err) - } - - resp := &prompb.ReadResponse{ - Results: make([]*prompb.QueryResult, len(req.Queries)), - } - - for i, r := range req.Queries { - startTs := r.StartTimestampMs - endTs := r.EndTimestampMs - ts := make([]*prompb.TimeSeries, len(rrs.series)) - for i, s := range rrs.series { - var samples []prompb.Sample - for _, sample := range s.Samples { - if sample.Timestamp >= startTs && sample.Timestamp < endTs { - samples = append(samples, sample) - } - } - var series prompb.TimeSeries - if len(samples) > 0 { - series.Labels = s.Labels - series.Samples = samples - } - ts[i] = &series - } - - resp.Results[i] = &prompb.QueryResult{Timeseries: ts} - data, err := proto.Marshal(resp) - if err != nil { - t.Fatalf("error marshal response: %s", err) - } - - compressed = snappy.Encode(nil, data) - - w.Header().Set("Content-Type", "application/x-protobuf") - w.Header().Set("Content-Encoding", "snappy") - w.WriteHeader(http.StatusOK) - - if _, err := w.Write(compressed); err != nil { - t.Fatalf("snappy encode error: %s", err) - } - } - }) -} - -func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer { - rrs := &RemoteReadServer{ - series: make([]*prompb.TimeSeries, 0), - } - rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t)) - return rrs -} - -func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) { - rrs.storage = NewMockStorage(series) -} - -func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !validateStreamReadHeaders(t, r) { - t.Fatalf("invalid read headers") - } - - f, ok := w.(http.Flusher) - if !ok { - t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface") - } - - stream := remote.NewChunkedWriter(w, f) - - data, err := io.ReadAll(r.Body) - if err != nil { - t.Fatalf("error read body: %s", err) - } - - decodedData, err := snappy.Decode(nil, data) - if err != nil { - t.Fatalf("error decode compressed data:%s", err) - } - - var req prompb.ReadRequest - if err := proto.Unmarshal(decodedData, &req); err != nil { - t.Fatalf("error unmarshal read request: %s", err) - } - - var chks []prompb.Chunk - ctx := context.Background() - for idx, r := range req.Queries { - startTs := r.StartTimestampMs - endTs := r.EndTimestampMs - - var matchers []*labels.Matcher - cb := func() (int64, error) { return 0, nil } - - c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb) - - q, err := c.ChunkQuerier(startTs, endTs) - if err != nil { - t.Fatalf("error init chunk querier: %s", err) - } - - ss := q.Select(ctx, false, nil, matchers...) - var iter chunks.Iterator - for ss.Next() { - series := ss.At() - iter = series.Iterator(iter) - labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil) - - frameBytesLeft := maxBytesInFrame - for _, lb := range labels { - frameBytesLeft -= lb.Size() - } - - isNext := iter.Next() - - for isNext { - chunk := iter.At() - - if chunk.Chunk == nil { - t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref) - } - - chks = append(chks, prompb.Chunk{ - MinTimeMs: chunk.MinTime, - MaxTimeMs: chunk.MaxTime, - Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()), - Data: chunk.Chunk.Bytes(), - }) - - frameBytesLeft -= chks[len(chks)-1].Size() - - // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. - isNext = iter.Next() - if frameBytesLeft > 0 && isNext { - continue - } - - resp := &prompb.ChunkedReadResponse{ - ChunkedSeries: []*prompb.ChunkedSeries{ - {Labels: labels, Chunks: chks}, - }, - QueryIndex: int64(idx), - } - - b, err := proto.Marshal(resp) - if err != nil { - t.Fatalf("error marshal response: %s", err) - } - - if _, err := stream.Write(b); err != nil { - t.Fatalf("error write to stream: %s", err) - } - chks = chks[:0] - rrs.storage.Reset() - } - if err := iter.Err(); err != nil { - t.Fatalf("error iterate over chunk series: %s", err) - } - } - } - }) -} - -func validateReadHeaders(t *testing.T, r *http.Request) bool { - if r.Method != http.MethodPost { - t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) - } - if r.Header.Get("Content-Encoding") != "snappy" { - t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") - } - if r.Header.Get("Content-Type") != "application/x-protobuf" { - t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf") - } - - remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") - if remoteReadVersion == "" { - t.Fatalf("got empty prometheus remote read header") - } - if !strings.HasPrefix(remoteReadVersion, "0.1.") { - t.Fatalf("wrong remote version defined") - } - - return true -} - -func validateStreamReadHeaders(t *testing.T, r *http.Request) bool { - if r.Method != http.MethodPost { - t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) - } - if r.Header.Get("Content-Encoding") != "snappy" { - t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") - } - if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" { - t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") - } - - remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") - if remoteReadVersion == "" { - t.Fatalf("got empty prometheus remote read header") - } - if !strings.HasPrefix(remoteReadVersion, "0.1.") { - t.Fatalf("wrong remote version defined") - } - return true -} - -func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries { - var ts []*prompb.TimeSeries - j := 0 - for i := 0; i < int(numOfSeries); i++ { - if i%3 == 0 { - j++ - } - - timeSeries := prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)}, - {Name: "job", Value: strconv.Itoa(i)}, - }, - } - - ts = append(ts, &timeSeries) - } - - for i := range ts { - ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples) - } - - return ts -} - -func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample { - samples := make([]prompb.Sample, 0) - delta := (endTime - startTime) / numOfSamples - - t := startTime - for t != endTime { - v := 100 * int64(idx) - samples = append(samples, prompb.Sample{ - Timestamp: t * 1000, - Value: float64(v), - }) - t = t + delta - } - - return samples -} - -type MockStorage struct { - query *prompb.Query - store []*prompb.TimeSeries -} - -func NewMockStorage(series []*prompb.TimeSeries) *MockStorage { - return &MockStorage{store: series} -} - -func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { - if ms.query != nil { - return nil, fmt.Errorf("expected only one call to remote client got: %v", query) - } - ms.query = query - - q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))} - for _, s := range ms.store { - var samples []prompb.Sample - for _, sample := range s.Samples { - if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs { - samples = append(samples, sample) - } - } - var series prompb.TimeSeries - if len(samples) > 0 { - series.Labels = s.Labels - series.Samples = samples - } - - q.Timeseries = append(q.Timeseries, &series) - } - return q, nil -} - -func (ms *MockStorage) Reset() { - ms.query = nil -} - -func labelsToLabelsProto(labels labels.Labels) []prompb.Label { - result := make([]prompb.Label, 0, len(labels)) - for _, l := range labels { - result = append(result, prompb.Label{ - Name: l.Name, - Value: l.Value, - }) - } - return result -} +// import ( +// "context" +// "fmt" +// "io" +// "net/http" +// "net/http/httptest" +// "strconv" +// "strings" +// "testing" +// +// "github.com/gogo/protobuf/proto" +// "github.com/golang/snappy" +// "github.com/prometheus/prometheus/model/labels" +// "github.com/prometheus/prometheus/prompb" +// "github.com/prometheus/prometheus/storage/remote" +// "github.com/prometheus/prometheus/tsdb/chunks" +// ) +// +// const ( +// maxBytesInFrame = 1024 * 1024 +// ) +// +// type RemoteReadServer struct { +// server *httptest.Server +// series []*prompb.TimeSeries +// storage *MockStorage +// } +// +// // NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the +// // passed series based on the request to the read endpoint. It returns a server which should be closed after +// // being used. +// func NewRemoteReadServer(t *testing.T) *RemoteReadServer { +// rrs := &RemoteReadServer{ +// series: make([]*prompb.TimeSeries, 0), +// } +// rrs.server = httptest.NewServer(rrs.getReadHandler(t)) +// return rrs +// } +// +// // Close closes the server. +// func (rrs *RemoteReadServer) Close() { +// rrs.server.Close() +// } +// +// func (rrs *RemoteReadServer) URL() string { +// return rrs.server.URL +// } +// +// func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) { +// rrs.series = append(rrs.series, series...) +// } +// +// func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler { +// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// if !validateReadHeaders(t, r) { +// t.Fatalf("invalid read headers") +// } +// +// compressed, err := io.ReadAll(r.Body) +// if err != nil { +// t.Fatalf("error read body: %s", err) +// } +// +// reqBuf, err := snappy.Decode(nil, compressed) +// if err != nil { +// t.Fatalf("error decode compressed data:%s", err) +// } +// +// var req prompb.ReadRequest +// if err := proto.Unmarshal(reqBuf, &req); err != nil { +// t.Fatalf("error unmarshal read request: %s", err) +// } +// +// resp := &prompb.ReadResponse{ +// Results: make([]*prompb.QueryResult, len(req.Queries)), +// } +// +// for i, r := range req.Queries { +// startTs := r.StartTimestampMs +// endTs := r.EndTimestampMs +// ts := make([]*prompb.TimeSeries, len(rrs.series)) +// for i, s := range rrs.series { +// var samples []prompb.Sample +// for _, sample := range s.Samples { +// if sample.Timestamp >= startTs && sample.Timestamp < endTs { +// samples = append(samples, sample) +// } +// } +// var series prompb.TimeSeries +// if len(samples) > 0 { +// series.Labels = s.Labels +// series.Samples = samples +// } +// ts[i] = &series +// } +// +// resp.Results[i] = &prompb.QueryResult{Timeseries: ts} +// data, err := proto.Marshal(resp) +// if err != nil { +// t.Fatalf("error marshal response: %s", err) +// } +// +// compressed = snappy.Encode(nil, data) +// +// w.Header().Set("Content-Type", "application/x-protobuf") +// w.Header().Set("Content-Encoding", "snappy") +// w.WriteHeader(http.StatusOK) +// +// if _, err := w.Write(compressed); err != nil { +// t.Fatalf("snappy encode error: %s", err) +// } +// } +// }) +// } +// +// func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer { +// rrs := &RemoteReadServer{ +// series: make([]*prompb.TimeSeries, 0), +// } +// rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t)) +// return rrs +// } +// +// func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) { +// rrs.storage = NewMockStorage(series) +// } +// +// func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler { +// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// if !validateStreamReadHeaders(t, r) { +// t.Fatalf("invalid read headers") +// } +// +// f, ok := w.(http.Flusher) +// if !ok { +// t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface") +// } +// +// stream := remote.NewChunkedWriter(w, f) +// +// data, err := io.ReadAll(r.Body) +// if err != nil { +// t.Fatalf("error read body: %s", err) +// } +// +// decodedData, err := snappy.Decode(nil, data) +// if err != nil { +// t.Fatalf("error decode compressed data:%s", err) +// } +// +// var req prompb.ReadRequest +// if err := proto.Unmarshal(decodedData, &req); err != nil { +// t.Fatalf("error unmarshal read request: %s", err) +// } +// +// var chks []prompb.Chunk +// ctx := context.Background() +// for idx, r := range req.Queries { +// startTs := r.StartTimestampMs +// endTs := r.EndTimestampMs +// +// var matchers []*labels.Matcher +// cb := func() (int64, error) { return 0, nil } +// +// c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb) +// +// q, err := c.ChunkQuerier(startTs, endTs) +// if err != nil { +// t.Fatalf("error init chunk querier: %s", err) +// } +// +// ss := q.Select(ctx, false, nil, matchers...) +// var iter chunks.Iterator +// for ss.Next() { +// series := ss.At() +// iter = series.Iterator(iter) +// labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil) +// +// frameBytesLeft := maxBytesInFrame +// for _, lb := range labels { +// frameBytesLeft -= lb.Size() +// } +// +// isNext := iter.Next() +// +// for isNext { +// chunk := iter.At() +// +// if chunk.Chunk == nil { +// t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref) +// } +// +// chks = append(chks, prompb.Chunk{ +// MinTimeMs: chunk.MinTime, +// MaxTimeMs: chunk.MaxTime, +// Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()), +// Data: chunk.Chunk.Bytes(), +// }) +// +// frameBytesLeft -= chks[len(chks)-1].Size() +// +// // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. +// isNext = iter.Next() +// if frameBytesLeft > 0 && isNext { +// continue +// } +// +// resp := &prompb.ChunkedReadResponse{ +// ChunkedSeries: []*prompb.ChunkedSeries{ +// {Labels: labels, Chunks: chks}, +// }, +// QueryIndex: int64(idx), +// } +// +// b, err := proto.Marshal(resp) +// if err != nil { +// t.Fatalf("error marshal response: %s", err) +// } +// +// if _, err := stream.Write(b); err != nil { +// t.Fatalf("error write to stream: %s", err) +// } +// chks = chks[:0] +// rrs.storage.Reset() +// } +// if err := iter.Err(); err != nil { +// t.Fatalf("error iterate over chunk series: %s", err) +// } +// } +// } +// }) +// } +// +// func validateReadHeaders(t *testing.T, r *http.Request) bool { +// if r.Method != http.MethodPost { +// t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) +// } +// if r.Header.Get("Content-Encoding") != "snappy" { +// t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") +// } +// if r.Header.Get("Content-Type") != "application/x-protobuf" { +// t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf") +// } +// +// remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") +// if remoteReadVersion == "" { +// t.Fatalf("got empty prometheus remote read header") +// } +// if !strings.HasPrefix(remoteReadVersion, "0.1.") { +// t.Fatalf("wrong remote version defined") +// } +// +// return true +// } +// +// func validateStreamReadHeaders(t *testing.T, r *http.Request) bool { +// if r.Method != http.MethodPost { +// t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) +// } +// if r.Header.Get("Content-Encoding") != "snappy" { +// t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") +// } +// if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" { +// t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") +// } +// +// remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") +// if remoteReadVersion == "" { +// t.Fatalf("got empty prometheus remote read header") +// } +// if !strings.HasPrefix(remoteReadVersion, "0.1.") { +// t.Fatalf("wrong remote version defined") +// } +// return true +// } +// +// func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries { +// var ts []*prompb.TimeSeries +// j := 0 +// for i := 0; i < int(numOfSeries); i++ { +// if i%3 == 0 { +// j++ +// } +// +// timeSeries := prompb.TimeSeries{ +// Labels: []prompb.Label{ +// {Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)}, +// {Name: "job", Value: strconv.Itoa(i)}, +// }, +// } +// +// ts = append(ts, &timeSeries) +// } +// +// for i := range ts { +// ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples) +// } +// +// return ts +// } +// +// func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample { +// samples := make([]prompb.Sample, 0) +// delta := (endTime - startTime) / numOfSamples +// +// t := startTime +// for t != endTime { +// v := 100 * int64(idx) +// samples = append(samples, prompb.Sample{ +// Timestamp: t * 1000, +// Value: float64(v), +// }) +// t = t + delta +// } +// +// return samples +// } +// +// type MockStorage struct { +// query *prompb.Query +// store []*prompb.TimeSeries +// } +// +// func NewMockStorage(series []*prompb.TimeSeries) *MockStorage { +// return &MockStorage{store: series} +// } +// +// func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { +// if ms.query != nil { +// return nil, fmt.Errorf("expected only one call to remote client got: %v", query) +// } +// ms.query = query +// +// q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))} +// for _, s := range ms.store { +// var samples []prompb.Sample +// for _, sample := range s.Samples { +// if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs { +// samples = append(samples, sample) +// } +// } +// var series prompb.TimeSeries +// if len(samples) > 0 { +// series.Labels = s.Labels +// series.Samples = samples +// } +// +// q.Timeseries = append(q.Timeseries, &series) +// } +// return q, nil +// } +// +// func (ms *MockStorage) Reset() { +// ms.query = nil +// } +// +// func labelsToLabelsProto(labels labels.Labels) []prompb.Label { +// result := make([]prompb.Label, 0, len(labels)) +// for _, l := range labels { +// result = append(result, prompb.Label{ +// Name: l.Name, +// Value: l.Value, +// }) +// } +// return result +// }