From 99fbb2948ba6b4597c7744f8f864002a4bdfcd5a Mon Sep 17 00:00:00 2001 From: Dmytro Kozlov <kozlovdmitriyy@gmail.com> Date: Thu, 12 Jan 2023 09:00:10 +0200 Subject: [PATCH] app/vmctl: add remote read protocol integration tests (#3626) --- app/vmctl/remote_read_test.go | 319 +++++++++++++++ .../remote_read_server.go | 366 ++++++++++++++++++ .../remote_write_server.go | 86 ++++ 3 files changed, 771 insertions(+) create mode 100644 app/vmctl/remote_read_test.go create mode 100644 app/vmctl/testdata/servers_integration_test/remote_read_server.go create mode 100644 app/vmctl/testdata/servers_integration_test/remote_write_server.go diff --git a/app/vmctl/remote_read_test.go b/app/vmctl/remote_read_test.go new file mode 100644 index 000000000..1fa0c4b5d --- /dev/null +++ b/app/vmctl/remote_read_test.go @@ -0,0 +1,319 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" + "github.com/prometheus/prometheus/prompb" +) + +func TestRemoteRead(t *testing.T) { + + var testCases = []struct { + name string + remoteReadConfig remoteread.Config + vmCfg vm.Config + start string + end string + numOfSamples int64 + numOfSeries int64 + rrp remoteReadProcessor + chunk string + remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries + expectedSeries []vm.TimeSeries + }{ + { + name: "step minute on minute time range", + remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, + vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, + start: "2022-11-26T11:23:05+02:00", + end: "2022-11-26T11:24:05+02:00", + numOfSamples: 2, + numOfSeries: 3, + chunk: stepper.StepMinute, + remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, + expectedSeries: []vm.TimeSeries{ + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1669454585000, 1669454615000}, + Values: []float64{0, 0}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1669454585000, 1669454615000}, + Values: []float64{100, 100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1669454585000, 1669454615000}, + Values: []float64{200, 200}, + }, + }, + }, + { + name: "step month on month time range", + remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, + vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, + start: "2022-09-26T11:23:05+02:00", + end: "2022-11-26T11:24:05+02:00", + numOfSamples: 2, + numOfSeries: 3, + chunk: stepper.StepMonth, + remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, + expectedSeries: []vm.TimeSeries{ + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{0}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{200}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{0}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{200}}, + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + remoteReadServer := remote_read_integration.NewRemoteReadServer(t) + defer remoteReadServer.Close() + remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) + defer remoteWriteServer.Close() + + tt.remoteReadConfig.Addr = remoteReadServer.URL() + + rr, err := remoteread.NewClient(tt.remoteReadConfig) + if err != nil { + t.Fatalf("error create remote read client: %s", err) + } + + start, err := time.Parse(time.RFC3339, tt.start) + if err != nil { + t.Fatalf("Error parse start time: %s", err) + } + + end, err := time.Parse(time.RFC3339, tt.end) + if err != nil { + t.Fatalf("Error parse end time: %s", err) + } + + rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) + + remoteReadServer.SetRemoteReadSeries(rrs) + remoteWriteServer.ExpectedSeries(tt.expectedSeries) + + tt.vmCfg.Addr = remoteWriteServer.URL() + + importer, err := vm.NewImporter(tt.vmCfg) + if err != nil { + t.Fatalf("failed to create VM importer: %s", err) + } + defer importer.Close() + + rmp := remoteReadProcessor{ + src: rr, + dst: importer, + filter: remoteReadFilter{ + timeStart: &start, + timeEnd: &end, + chunk: tt.chunk, + }, + cc: 1, + } + + ctx := context.Background() + err = rmp.run(ctx, true, false) + if err != nil { + t.Fatalf("failed to run remote read processor: %s", err) + } + }) + } +} + +func TestSteamRemoteRead(t *testing.T) { + + var testCases = []struct { + name string + remoteReadConfig remoteread.Config + vmCfg vm.Config + start string + end string + numOfSamples int64 + numOfSeries int64 + rrp remoteReadProcessor + chunk string + remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries + expectedSeries []vm.TimeSeries + }{ + { + name: "step minute on minute time range", + remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, + vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, + start: "2022-11-26T11:23:05+02:00", + end: "2022-11-26T11:24:05+02:00", + numOfSamples: 2, + numOfSeries: 3, + chunk: stepper.StepMinute, + remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, + expectedSeries: []vm.TimeSeries{ + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1669454585000, 1669454615000}, + Values: []float64{0, 0}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1669454585000, 1669454615000}, + Values: []float64{100, 100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1669454585000, 1669454615000}, + Values: []float64{200, 200}, + }, + }, + }, + { + name: "step month on month time range", + remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, + vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, + start: "2022-09-26T11:23:05+02:00", + end: "2022-11-26T11:24:05+02:00", + numOfSamples: 2, + numOfSeries: 3, + chunk: stepper.StepMonth, + remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, + expectedSeries: []vm.TimeSeries{ + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{0}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1664184185000}, + Values: []float64{200}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{0}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{100}, + }, + { + Name: "vm_metric_1", + LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, + Timestamps: []int64{1666819415000}, + Values: []float64{200}}, + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t) + defer remoteReadServer.Close() + remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) + defer remoteWriteServer.Close() + + tt.remoteReadConfig.Addr = remoteReadServer.URL() + + rr, err := remoteread.NewClient(tt.remoteReadConfig) + if err != nil { + t.Fatalf("error create remote read client: %s", err) + } + + start, err := time.Parse(time.RFC3339, tt.start) + if err != nil { + t.Fatalf("Error parse start time: %s", err) + } + + end, err := time.Parse(time.RFC3339, tt.end) + if err != nil { + t.Fatalf("Error parse end time: %s", err) + } + + rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) + + remoteReadServer.InitMockStorage(rrs) + remoteWriteServer.ExpectedSeries(tt.expectedSeries) + + tt.vmCfg.Addr = remoteWriteServer.URL() + + importer, err := vm.NewImporter(tt.vmCfg) + if err != nil { + t.Fatalf("failed to create VM importer: %s", err) + } + defer importer.Close() + + rmp := remoteReadProcessor{ + src: rr, + dst: importer, + filter: remoteReadFilter{ + timeStart: &start, + timeEnd: &end, + chunk: tt.chunk, + }, + cc: 1, + } + + ctx := context.Background() + err = rmp.run(ctx, true, false) + if err != nil { + t.Fatalf("failed to run remote read processor: %s", err) + } + }) + } +} diff --git a/app/vmctl/testdata/servers_integration_test/remote_read_server.go b/app/vmctl/testdata/servers_integration_test/remote_read_server.go new file mode 100644 index 000000000..aa8645333 --- /dev/null +++ b/app/vmctl/testdata/servers_integration_test/remote_read_server.go @@ -0,0 +1,366 @@ +package remote_read_integration + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" +) + +const ( + maxBytesInFrame = 1024 * 1024 +) + +type RemoteReadServer struct { + server *httptest.Server + series []*prompb.TimeSeries + storage *MockStorage +} + +// NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the +// passed series based on the request to the read endpoint. It returns a server which should be closed after +// being used. +func NewRemoteReadServer(t *testing.T) *RemoteReadServer { + rrs := &RemoteReadServer{ + series: make([]*prompb.TimeSeries, 0), + } + rrs.server = httptest.NewServer(rrs.getReadHandler(t)) + return rrs +} + +// Close closes the server. +func (rrs *RemoteReadServer) Close() { + rrs.server.Close() +} + +func (rrs *RemoteReadServer) URL() string { + return rrs.server.URL +} + +func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) { + rrs.series = append(rrs.series, series...) +} + +func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !validateReadHeaders(t, r) { + t.Fatalf("invalid read headers") + } + + compressed, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("error read body: %s", err) + } + + reqBuf, err := snappy.Decode(nil, compressed) + if err != nil { + t.Fatalf("error decode compressed data:%s", err) + } + + var req prompb.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + t.Fatalf("error unmarshal read request: %s", err) + } + + resp := &prompb.ReadResponse{ + Results: make([]*prompb.QueryResult, len(req.Queries)), + } + + for i, r := range req.Queries { + startTs := r.StartTimestampMs + endTs := r.EndTimestampMs + ts := make([]*prompb.TimeSeries, len(rrs.series)) + for i, s := range rrs.series { + var samples []prompb.Sample + for _, sample := range s.Samples { + if sample.Timestamp >= startTs && sample.Timestamp < endTs { + samples = append(samples, sample) + } + } + var series prompb.TimeSeries + if len(samples) > 0 { + series.Labels = s.Labels + series.Samples = samples + } + ts[i] = &series + } + + resp.Results[i] = &prompb.QueryResult{Timeseries: ts} + data, err := proto.Marshal(resp) + if err != nil { + t.Fatalf("error marshal response: %s", err) + } + + compressed = snappy.Encode(nil, data) + + w.Header().Set("Content-Type", "application/x-protobuf") + w.Header().Set("Content-Encoding", "snappy") + w.WriteHeader(http.StatusOK) + + if _, err := w.Write(compressed); err != nil { + t.Fatalf("snappy encode error: %s", err) + } + } + }) +} + +func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer { + rrs := &RemoteReadServer{ + series: make([]*prompb.TimeSeries, 0), + } + rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t)) + return rrs +} + +func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) { + rrs.storage = NewMockStorage(series) +} + +func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !validateStreamReadHeaders(t, r) { + t.Fatalf("invalid read headers") + } + + f, ok := w.(http.Flusher) + if !ok { + t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface") + } + + stream := remote.NewChunkedWriter(w, f) + + data, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("error read body: %s", err) + } + + decodedData, err := snappy.Decode(nil, data) + if err != nil { + t.Fatalf("error decode compressed data:%s", err) + } + + var req prompb.ReadRequest + if err := proto.Unmarshal(decodedData, &req); err != nil { + t.Fatalf("error unmarshal read request: %s", err) + } + + var chunks []prompb.Chunk + ctx := context.Background() + for idx, r := range req.Queries { + startTs := r.StartTimestampMs + endTs := r.EndTimestampMs + + var matchers []*labels.Matcher + cb := func() (int64, error) { return 0, nil } + + c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb) + + q, err := c.ChunkQuerier(ctx, startTs, endTs) + if err != nil { + t.Fatalf("error init chunk querier: %s", err) + } + + ss := q.Select(false, nil, matchers...) + for ss.Next() { + series := ss.At() + iter := series.Iterator() + labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil) + + frameBytesLeft := maxBytesInFrame + for _, lb := range labels { + frameBytesLeft -= lb.Size() + } + + isNext := iter.Next() + + for isNext { + chunk := iter.At() + + if chunk.Chunk == nil { + t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref) + } + + chunks = append(chunks, prompb.Chunk{ + MinTimeMs: chunk.MinTime, + MaxTimeMs: chunk.MaxTime, + Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()), + Data: chunk.Chunk.Bytes(), + }) + + frameBytesLeft -= chunks[len(chunks)-1].Size() + + // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. + isNext = iter.Next() + if frameBytesLeft > 0 && isNext { + continue + } + + resp := &prompb.ChunkedReadResponse{ + ChunkedSeries: []*prompb.ChunkedSeries{ + {Labels: labels, Chunks: chunks}, + }, + QueryIndex: int64(idx), + } + + b, err := proto.Marshal(resp) + if err != nil { + t.Fatalf("error marshal response: %s", err) + } + + if _, err := stream.Write(b); err != nil { + t.Fatalf("error write to stream: %s", err) + } + chunks = chunks[:0] + rrs.storage.Reset() + } + if err := iter.Err(); err != nil { + t.Fatalf("error iterate over chunk series: %s", err) + } + } + } + }) +} + +func validateReadHeaders(t *testing.T, r *http.Request) bool { + if r.Method != http.MethodPost { + t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) + } + if r.Header.Get("Content-Encoding") != "snappy" { + t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") + } + if r.Header.Get("Content-Type") != "application/x-protobuf" { + t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf") + } + + remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") + if remoteReadVersion == "" { + t.Fatalf("got empty prometheus remote read header") + } + if !strings.HasPrefix(remoteReadVersion, "0.1.") { + t.Fatalf("wrong remote version defined") + } + + return true +} + +func validateStreamReadHeaders(t *testing.T, r *http.Request) bool { + if r.Method != http.MethodPost { + t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) + } + if r.Header.Get("Content-Encoding") != "snappy" { + t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") + } + if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" { + t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") + } + + remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") + if remoteReadVersion == "" { + t.Fatalf("got empty prometheus remote read header") + } + if !strings.HasPrefix(remoteReadVersion, "0.1.") { + t.Fatalf("wrong remote version defined") + } + return true +} + +func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries { + var ts []*prompb.TimeSeries + j := 0 + for i := 0; i < int(numOfSeries); i++ { + if i%3 == 0 { + j++ + } + + timeSeries := prompb.TimeSeries{ + Labels: []prompb.Label{ + {Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)}, + {Name: "job", Value: strconv.Itoa(i)}, + }, + } + + ts = append(ts, &timeSeries) + } + + for i := range ts { + ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples) + } + + return ts +} + +func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample { + samples := make([]prompb.Sample, 0) + delta := (endTime - startTime) / numOfSamples + + t := startTime + for t != endTime { + v := 100 * int64(idx) + samples = append(samples, prompb.Sample{ + Timestamp: t * 1000, + Value: float64(v), + }) + t = t + delta + } + + return samples +} + +type MockStorage struct { + query *prompb.Query + store []*prompb.TimeSeries +} + +func NewMockStorage(series []*prompb.TimeSeries) *MockStorage { + return &MockStorage{store: series} +} + +func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { + if ms.query != nil { + return nil, fmt.Errorf("expected only one call to remote client got: %v", query) + } + ms.query = query + + q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))} + for _, s := range ms.store { + var samples []prompb.Sample + for _, sample := range s.Samples { + if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs { + samples = append(samples, sample) + } + } + var series prompb.TimeSeries + if len(samples) > 0 { + series.Labels = s.Labels + series.Samples = samples + } + + q.Timeseries = append(q.Timeseries, &series) + } + return q, nil +} + +func (ms *MockStorage) Reset() { + ms.query = nil +} + +func labelsToLabelsProto(labels labels.Labels) []prompb.Label { + result := make([]prompb.Label, 0, len(labels)) + for _, l := range labels { + result = append(result, prompb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + return result +} diff --git a/app/vmctl/testdata/servers_integration_test/remote_write_server.go b/app/vmctl/testdata/servers_integration_test/remote_write_server.go new file mode 100644 index 000000000..9671ddb94 --- /dev/null +++ b/app/vmctl/testdata/servers_integration_test/remote_write_server.go @@ -0,0 +1,86 @@ +package remote_read_integration + +import ( + "bufio" + "net/http" + "net/http/httptest" + "reflect" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" +) + +type RemoteWriteServer struct { + server *httptest.Server + series []vm.TimeSeries +} + +// NewRemoteWriteServer prepares test remote write server +func NewRemoteWriteServer(t *testing.T) *RemoteWriteServer { + rws := &RemoteWriteServer{series: make([]vm.TimeSeries, 0)} + mux := http.NewServeMux() + mux.Handle("/api/v1/import", rws.getWriteHandler(t)) + mux.Handle("/health", rws.handlePing()) + rws.server = httptest.NewServer(mux) + return rws +} + +// Close closes the server. +func (rws *RemoteWriteServer) Close() { + rws.server.Close() +} + +func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) { + rws.series = append(rws.series, series...) +} + +func (rws *RemoteWriteServer) URL() string { + return rws.server.URL +} + +func (rws *RemoteWriteServer) getWriteHandler(t *testing.T) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var tss []vm.TimeSeries + scanner := bufio.NewScanner(r.Body) + var rows parser.Rows + for scanner.Scan() { + + rows.Unmarshal(scanner.Text()) + for _, row := range rows.Rows { + var labelPairs []vm.LabelPair + var ts vm.TimeSeries + nameValue := "" + for _, tag := range row.Tags { + if string(tag.Key) == "__name__" { + nameValue = string(tag.Value) + continue + } + labelPairs = append(labelPairs, vm.LabelPair{Name: string(tag.Key), Value: string(tag.Value)}) + } + + ts.Values = append(ts.Values, row.Values...) + ts.Timestamps = append(ts.Timestamps, row.Timestamps...) + ts.Name = nameValue + ts.LabelPairs = labelPairs + tss = append(tss, ts) + } + rows.Reset() + } + + if !reflect.DeepEqual(tss, rws.series) { + w.WriteHeader(http.StatusInternalServerError) + t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.series, tss) + return + } + + w.WriteHeader(http.StatusNoContent) + }) +} + +func (rws *RemoteWriteServer) handlePing() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + }) +}