diff --git a/app/vmctl/remote_read_test.go b/app/vmctl/remote_read_test.go deleted file mode 100644 index bfb19b2cd1..0000000000 --- a/app/vmctl/remote_read_test.go +++ /dev/null @@ -1,319 +0,0 @@ -package main - -import ( - "context" - "testing" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/remoteread" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" - "github.com/prometheus/prometheus/prompb" -) - -func TestRemoteRead(t *testing.T) { - - var testCases = []struct { - name string - remoteReadConfig remoteread.Config - vmCfg vm.Config - start string - end string - numOfSamples int64 - numOfSeries int64 - rrp remoteReadProcessor - chunk string - remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries - expectedSeries []vm.TimeSeries - }{ - { - name: "step minute on minute time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, - start: "2022-11-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMinute, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{0, 0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{100, 100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{200, 200}, - }, - }, - }, - { - name: "step month on month time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*"}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, - start: "2022-09-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMonth, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{200}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{200}}, - }, - }, - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - remoteReadServer := remote_read_integration.NewRemoteReadServer(t) - defer remoteReadServer.Close() - remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) - defer remoteWriteServer.Close() - - tt.remoteReadConfig.Addr = remoteReadServer.URL() - - rr, err := remoteread.NewClient(tt.remoteReadConfig) - if err != nil { - t.Fatalf("error create remote read client: %s", err) - } - - start, err := time.Parse(time.RFC3339, tt.start) - if err != nil { - t.Fatalf("Error parse start time: %s", err) - } - - end, err := time.Parse(time.RFC3339, tt.end) - if err != nil { - t.Fatalf("Error parse end time: %s", err) - } - - rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) - - remoteReadServer.SetRemoteReadSeries(rrs) - remoteWriteServer.ExpectedSeries(tt.expectedSeries) - - tt.vmCfg.Addr = remoteWriteServer.URL() - - importer, err := vm.NewImporter(ctx, tt.vmCfg) - if err != nil { - t.Fatalf("failed to create VM importer: %s", err) - } - defer importer.Close() - - rmp := remoteReadProcessor{ - src: rr, - dst: importer, - filter: remoteReadFilter{ - timeStart: &start, - timeEnd: &end, - chunk: tt.chunk, - }, - cc: 1, - } - - err = rmp.run(ctx, true, false) - if err != nil { - t.Fatalf("failed to run remote read processor: %s", err) - } - }) - } -} - -func TestSteamRemoteRead(t *testing.T) { - - var testCases = []struct { - name string - remoteReadConfig remoteread.Config - vmCfg vm.Config - start string - end string - numOfSamples int64 - numOfSeries int64 - rrp remoteReadProcessor - chunk string - remoteReadSeries func(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries - expectedSeries []vm.TimeSeries - }{ - { - name: "step minute on minute time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, - start: "2022-11-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMinute, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{0, 0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{100, 100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1669454585000, 1669454615000}, - Values: []float64{200, 200}, - }, - }, - }, - { - name: "step month on month time range", - remoteReadConfig: remoteread.Config{Addr: "", LabelName: "__name__", LabelValue: ".*", UseStream: true}, - vmCfg: vm.Config{Addr: "", Concurrency: 1, DisableProgressBar: true}, - start: "2022-09-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMonth, - remoteReadSeries: remote_read_integration.GenerateRemoteReadSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{200}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{200}}, - }, - }, - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - remoteReadServer := remote_read_integration.NewRemoteReadStreamServer(t) - defer remoteReadServer.Close() - remoteWriteServer := remote_read_integration.NewRemoteWriteServer(t) - defer remoteWriteServer.Close() - - tt.remoteReadConfig.Addr = remoteReadServer.URL() - - rr, err := remoteread.NewClient(tt.remoteReadConfig) - if err != nil { - t.Fatalf("error create remote read client: %s", err) - } - - start, err := time.Parse(time.RFC3339, tt.start) - if err != nil { - t.Fatalf("Error parse start time: %s", err) - } - - end, err := time.Parse(time.RFC3339, tt.end) - if err != nil { - t.Fatalf("Error parse end time: %s", err) - } - - rrs := tt.remoteReadSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) - - remoteReadServer.InitMockStorage(rrs) - remoteWriteServer.ExpectedSeries(tt.expectedSeries) - - tt.vmCfg.Addr = remoteWriteServer.URL() - - importer, err := vm.NewImporter(ctx, tt.vmCfg) - if err != nil { - t.Fatalf("failed to create VM importer: %s", err) - } - defer importer.Close() - - rmp := remoteReadProcessor{ - src: rr, - dst: importer, - filter: remoteReadFilter{ - timeStart: &start, - timeEnd: &end, - chunk: tt.chunk, - }, - cc: 1, - } - - err = rmp.run(ctx, true, false) - if err != nil { - t.Fatalf("failed to run remote read processor: %s", err) - } - }) - } -} diff --git a/app/vmctl/testdata/servers_integration_test/remote_read_server.go b/app/vmctl/testdata/servers_integration_test/remote_read_server.go deleted file mode 100644 index eca4183ae8..0000000000 --- a/app/vmctl/testdata/servers_integration_test/remote_read_server.go +++ /dev/null @@ -1,368 +0,0 @@ -package remote_read_integration - -import ( - "context" - "fmt" - "io" - "net/http" - "net/http/httptest" - "strconv" - "strings" - "testing" - - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/prompb" - "github.com/prometheus/prometheus/storage/remote" - "github.com/prometheus/prometheus/tsdb/chunks" -) - -const ( - maxBytesInFrame = 1024 * 1024 -) - -type RemoteReadServer struct { - server *httptest.Server - series []*prompb.TimeSeries - storage *MockStorage -} - -// NewRemoteReadServer creates a remote read server. It exposes a single endpoint and responds with the -// passed series based on the request to the read endpoint. It returns a server which should be closed after -// being used. -func NewRemoteReadServer(t *testing.T) *RemoteReadServer { - rrs := &RemoteReadServer{ - series: make([]*prompb.TimeSeries, 0), - } - rrs.server = httptest.NewServer(rrs.getReadHandler(t)) - return rrs -} - -// Close closes the server. -func (rrs *RemoteReadServer) Close() { - rrs.server.Close() -} - -func (rrs *RemoteReadServer) URL() string { - return rrs.server.URL -} - -func (rrs *RemoteReadServer) SetRemoteReadSeries(series []*prompb.TimeSeries) { - rrs.series = append(rrs.series, series...) -} - -func (rrs *RemoteReadServer) getReadHandler(t *testing.T) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !validateReadHeaders(t, r) { - t.Fatalf("invalid read headers") - } - - compressed, err := io.ReadAll(r.Body) - if err != nil { - t.Fatalf("error read body: %s", err) - } - - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - t.Fatalf("error decode compressed data:%s", err) - } - - var req prompb.ReadRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - t.Fatalf("error unmarshal read request: %s", err) - } - - resp := &prompb.ReadResponse{ - Results: make([]*prompb.QueryResult, len(req.Queries)), - } - - for i, r := range req.Queries { - startTs := r.StartTimestampMs - endTs := r.EndTimestampMs - ts := make([]*prompb.TimeSeries, len(rrs.series)) - for i, s := range rrs.series { - var samples []prompb.Sample - for _, sample := range s.Samples { - if sample.Timestamp >= startTs && sample.Timestamp < endTs { - samples = append(samples, sample) - } - } - var series prompb.TimeSeries - if len(samples) > 0 { - series.Labels = s.Labels - series.Samples = samples - } - ts[i] = &series - } - - resp.Results[i] = &prompb.QueryResult{Timeseries: ts} - data, err := proto.Marshal(resp) - if err != nil { - t.Fatalf("error marshal response: %s", err) - } - - compressed = snappy.Encode(nil, data) - - w.Header().Set("Content-Type", "application/x-protobuf") - w.Header().Set("Content-Encoding", "snappy") - w.WriteHeader(http.StatusOK) - - if _, err := w.Write(compressed); err != nil { - t.Fatalf("snappy encode error: %s", err) - } - } - }) -} - -func NewRemoteReadStreamServer(t *testing.T) *RemoteReadServer { - rrs := &RemoteReadServer{ - series: make([]*prompb.TimeSeries, 0), - } - rrs.server = httptest.NewServer(rrs.getStreamReadHandler(t)) - return rrs -} - -func (rrs *RemoteReadServer) InitMockStorage(series []*prompb.TimeSeries) { - rrs.storage = NewMockStorage(series) -} - -func (rrs *RemoteReadServer) getStreamReadHandler(t *testing.T) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !validateStreamReadHeaders(t, r) { - t.Fatalf("invalid read headers") - } - - f, ok := w.(http.Flusher) - if !ok { - t.Fatalf("internal http.ResponseWriter does not implement http.Flusher interface") - } - - stream := remote.NewChunkedWriter(w, f) - - data, err := io.ReadAll(r.Body) - if err != nil { - t.Fatalf("error read body: %s", err) - } - - decodedData, err := snappy.Decode(nil, data) - if err != nil { - t.Fatalf("error decode compressed data:%s", err) - } - - var req prompb.ReadRequest - if err := proto.Unmarshal(decodedData, &req); err != nil { - t.Fatalf("error unmarshal read request: %s", err) - } - - var chks []prompb.Chunk - ctx := context.Background() - for idx, r := range req.Queries { - startTs := r.StartTimestampMs - endTs := r.EndTimestampMs - - var matchers []*labels.Matcher - cb := func() (int64, error) { return 0, nil } - - c := remote.NewSampleAndChunkQueryableClient(rrs.storage, nil, matchers, true, cb) - - q, err := c.ChunkQuerier(ctx, startTs, endTs) - if err != nil { - t.Fatalf("error init chunk querier: %s", err) - } - - ss := q.Select(false, nil, matchers...) - var iter chunks.Iterator - for ss.Next() { - series := ss.At() - iter = series.Iterator(iter) - labels := remote.MergeLabels(labelsToLabelsProto(series.Labels()), nil) - - frameBytesLeft := maxBytesInFrame - for _, lb := range labels { - frameBytesLeft -= lb.Size() - } - - isNext := iter.Next() - - for isNext { - chunk := iter.At() - - if chunk.Chunk == nil { - t.Fatalf("error found not populated chunk returned by SeriesSet at ref: %v", chunk.Ref) - } - - chks = append(chks, prompb.Chunk{ - MinTimeMs: chunk.MinTime, - MaxTimeMs: chunk.MaxTime, - Type: prompb.Chunk_Encoding(chunk.Chunk.Encoding()), - Data: chunk.Chunk.Bytes(), - }) - - frameBytesLeft -= chks[len(chks)-1].Size() - - // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. - isNext = iter.Next() - if frameBytesLeft > 0 && isNext { - continue - } - - resp := &prompb.ChunkedReadResponse{ - ChunkedSeries: []*prompb.ChunkedSeries{ - {Labels: labels, Chunks: chks}, - }, - QueryIndex: int64(idx), - } - - b, err := proto.Marshal(resp) - if err != nil { - t.Fatalf("error marshal response: %s", err) - } - - if _, err := stream.Write(b); err != nil { - t.Fatalf("error write to stream: %s", err) - } - chks = chks[:0] - rrs.storage.Reset() - } - if err := iter.Err(); err != nil { - t.Fatalf("error iterate over chunk series: %s", err) - } - } - } - }) -} - -func validateReadHeaders(t *testing.T, r *http.Request) bool { - if r.Method != http.MethodPost { - t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) - } - if r.Header.Get("Content-Encoding") != "snappy" { - t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") - } - if r.Header.Get("Content-Type") != "application/x-protobuf" { - t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-protobuf") - } - - remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") - if remoteReadVersion == "" { - t.Fatalf("got empty prometheus remote read header") - } - if !strings.HasPrefix(remoteReadVersion, "0.1.") { - t.Fatalf("wrong remote version defined") - } - - return true -} - -func validateStreamReadHeaders(t *testing.T, r *http.Request) bool { - if r.Method != http.MethodPost { - t.Fatalf("got %q method, expected %q", r.Method, http.MethodPost) - } - if r.Header.Get("Content-Encoding") != "snappy" { - t.Fatalf("got %q content encoding header, expected %q", r.Header.Get("Content-Encoding"), "snappy") - } - if r.Header.Get("Content-Type") != "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" { - t.Fatalf("got %q content type header, expected %q", r.Header.Get("Content-Type"), "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") - } - - remoteReadVersion := r.Header.Get("X-Prometheus-Remote-Read-Version") - if remoteReadVersion == "" { - t.Fatalf("got empty prometheus remote read header") - } - if !strings.HasPrefix(remoteReadVersion, "0.1.") { - t.Fatalf("wrong remote version defined") - } - return true -} - -func GenerateRemoteReadSeries(start, end, numOfSeries, numOfSamples int64) []*prompb.TimeSeries { - var ts []*prompb.TimeSeries - j := 0 - for i := 0; i < int(numOfSeries); i++ { - if i%3 == 0 { - j++ - } - - timeSeries := prompb.TimeSeries{ - Labels: []prompb.Label{ - {Name: labels.MetricName, Value: fmt.Sprintf("vm_metric_%d", j)}, - {Name: "job", Value: strconv.Itoa(i)}, - }, - } - - ts = append(ts, &timeSeries) - } - - for i := range ts { - ts[i].Samples = generateRemoteReadSamples(i, start, end, numOfSamples) - } - - return ts -} - -func generateRemoteReadSamples(idx int, startTime, endTime, numOfSamples int64) []prompb.Sample { - samples := make([]prompb.Sample, 0) - delta := (endTime - startTime) / numOfSamples - - t := startTime - for t != endTime { - v := 100 * int64(idx) - samples = append(samples, prompb.Sample{ - Timestamp: t * 1000, - Value: float64(v), - }) - t = t + delta - } - - return samples -} - -type MockStorage struct { - query *prompb.Query - store []*prompb.TimeSeries -} - -func NewMockStorage(series []*prompb.TimeSeries) *MockStorage { - return &MockStorage{store: series} -} - -func (ms *MockStorage) Read(_ context.Context, query *prompb.Query) (*prompb.QueryResult, error) { - if ms.query != nil { - return nil, fmt.Errorf("expected only one call to remote client got: %v", query) - } - ms.query = query - - q := &prompb.QueryResult{Timeseries: make([]*prompb.TimeSeries, 0, len(ms.store))} - for _, s := range ms.store { - var samples []prompb.Sample - for _, sample := range s.Samples { - if sample.Timestamp >= query.StartTimestampMs && sample.Timestamp < query.EndTimestampMs { - samples = append(samples, sample) - } - } - var series prompb.TimeSeries - if len(samples) > 0 { - series.Labels = s.Labels - series.Samples = samples - } - - q.Timeseries = append(q.Timeseries, &series) - } - return q, nil -} - -func (ms *MockStorage) Reset() { - ms.query = nil -} - -func labelsToLabelsProto(labels labels.Labels) []prompb.Label { - result := make([]prompb.Label, 0, len(labels)) - for _, l := range labels { - result = append(result, prompb.Label{ - Name: l.Name, - Value: l.Value, - }) - } - return result -} diff --git a/app/vmctl/testdata/servers_integration_test/remote_write_server.go b/app/vmctl/testdata/servers_integration_test/remote_write_server.go deleted file mode 100644 index e6525af1de..0000000000 --- a/app/vmctl/testdata/servers_integration_test/remote_write_server.go +++ /dev/null @@ -1,261 +0,0 @@ -package remote_read_integration - -import ( - "bufio" - "encoding/json" - "fmt" - "log" - "net/http" - "net/http/httptest" - "reflect" - "sort" - "strconv" - "testing" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native/stream" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" -) - -// LabelValues represents series from api/v1/series response -type LabelValues map[string]string - -// Response represents response from api/v1/series -type Response struct { - Status string `json:"status"` - Series []LabelValues `json:"data"` -} - -// RemoteWriteServer represents fake remote write server with database -type RemoteWriteServer struct { - server *httptest.Server - series []vm.TimeSeries - expectedSeries []vm.TimeSeries -} - -// NewRemoteWriteServer prepares test remote write server -func NewRemoteWriteServer(t *testing.T) *RemoteWriteServer { - rws := &RemoteWriteServer{series: make([]vm.TimeSeries, 0)} - mux := http.NewServeMux() - - mux.Handle("/api/v1/import", rws.getWriteHandler(t)) - mux.Handle("/health", rws.handlePing()) - mux.Handle("/api/v1/series", rws.seriesHandler()) - mux.Handle("/api/v1/export/native", rws.exportNativeHandler()) - mux.Handle("/api/v1/import/native", rws.importNativeHandler(t)) - rws.server = httptest.NewServer(mux) - return rws -} - -// Close closes the server -func (rws *RemoteWriteServer) Close() { - rws.server.Close() -} - -// Series saves generated series for fake database -func (rws *RemoteWriteServer) Series(series []vm.TimeSeries) { - rws.series = append(rws.series, series...) -} - -// ExpectedSeries saves expected results to check in the handler -func (rws *RemoteWriteServer) ExpectedSeries(series []vm.TimeSeries) { - rws.expectedSeries = append(rws.expectedSeries, series...) -} - -// URL returns server url -func (rws *RemoteWriteServer) URL() string { - return rws.server.URL -} - -func (rws *RemoteWriteServer) getWriteHandler(t *testing.T) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var tss []vm.TimeSeries - scanner := bufio.NewScanner(r.Body) - var rows parser.Rows - for scanner.Scan() { - - rows.Unmarshal(scanner.Text()) - for _, row := range rows.Rows { - var labelPairs []vm.LabelPair - var ts vm.TimeSeries - nameValue := "" - for _, tag := range row.Tags { - if string(tag.Key) == "__name__" { - nameValue = string(tag.Value) - continue - } - labelPairs = append(labelPairs, vm.LabelPair{Name: string(tag.Key), Value: string(tag.Value)}) - } - - ts.Values = append(ts.Values, row.Values...) - ts.Timestamps = append(ts.Timestamps, row.Timestamps...) - ts.Name = nameValue - ts.LabelPairs = labelPairs - tss = append(tss, ts) - } - rows.Reset() - } - - if !reflect.DeepEqual(tss, rws.expectedSeries) { - w.WriteHeader(http.StatusInternalServerError) - t.Fatalf("datasets not equal, expected: %#v; \n got: %#v", rws.expectedSeries, tss) - return - } - - w.WriteHeader(http.StatusNoContent) - return - }) -} - -func (rws *RemoteWriteServer) handlePing() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte("OK")) - }) -} - -func (rws *RemoteWriteServer) seriesHandler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var labelValues []LabelValues - for _, ser := range rws.series { - metricNames := make(LabelValues) - if ser.Name != "" { - metricNames["__name__"] = ser.Name - } - for _, p := range ser.LabelPairs { - metricNames[p.Name] = p.Value - } - labelValues = append(labelValues, metricNames) - } - - resp := Response{ - Status: "success", - Series: labelValues, - } - - err := json.NewEncoder(w).Encode(resp) - if err != nil { - log.Printf("error send series: %s", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - }) -} - -func (rws *RemoteWriteServer) exportNativeHandler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - now := time.Now() - err := prometheus.ExportNativeHandler(now, w, r) - if err != nil { - log.Printf("error export series via native protocol: %s", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusNoContent) - return - }) -} - -func (rws *RemoteWriteServer) importNativeHandler(t *testing.T) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - common.StartUnmarshalWorkers() - defer common.StopUnmarshalWorkers() - - var gotTimeSeries []vm.TimeSeries - - err := stream.Parse(r.Body, false, func(block *stream.Block) error { - mn := &block.MetricName - var timeseries vm.TimeSeries - timeseries.Name = string(mn.MetricGroup) - timeseries.Timestamps = append(timeseries.Timestamps, block.Timestamps...) - timeseries.Values = append(timeseries.Values, block.Values...) - - for i := range mn.Tags { - tag := &mn.Tags[i] - timeseries.LabelPairs = append(timeseries.LabelPairs, vm.LabelPair{ - Name: string(tag.Key), - Value: string(tag.Value), - }) - } - - gotTimeSeries = append(gotTimeSeries, timeseries) - - return nil - }) - if err != nil { - log.Printf("error parse stream blocks: %s", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - // got timeseries should be sorted - // because they are processed independently - sort.SliceStable(gotTimeSeries, func(i, j int) bool { - iv, jv := gotTimeSeries[i], gotTimeSeries[j] - switch { - case iv.Values[0] != jv.Values[0]: - return iv.Values[0] < jv.Values[0] - case iv.Timestamps[0] != jv.Timestamps[0]: - return iv.Timestamps[0] < jv.Timestamps[0] - default: - return iv.Name < jv.Name - } - }) - - if !reflect.DeepEqual(gotTimeSeries, rws.expectedSeries) { - w.WriteHeader(http.StatusInternalServerError) - t.Fatalf("datasets not equal, expected: %#v;\n got: %#v", rws.expectedSeries, gotTimeSeries) - } - - w.WriteHeader(http.StatusNoContent) - return - }) -} - -// GenerateVNSeries generates test timeseries -func GenerateVNSeries(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries { - var ts []vm.TimeSeries - j := 0 - for i := 0; i < int(numOfSeries); i++ { - if i%3 == 0 { - j++ - } - - timeSeries := vm.TimeSeries{ - Name: fmt.Sprintf("vm_metric_%d", j), - LabelPairs: []vm.LabelPair{ - {Name: "job", Value: strconv.Itoa(i)}, - }, - } - - ts = append(ts, timeSeries) - } - - for i := range ts { - t, v := generateTimeStampsAndValues(i, start, end, numOfSamples) - ts[i].Timestamps = t - ts[i].Values = v - } - - return ts -} - -func generateTimeStampsAndValues(idx int, startTime, endTime, numOfSamples int64) ([]int64, []float64) { - delta := (endTime - startTime) / numOfSamples - - var timestamps []int64 - var values []float64 - t := startTime - for t != endTime { - v := 100 * int64(idx) - timestamps = append(timestamps, t*1000) - values = append(values, float64(v)) - t = t + delta - } - - return timestamps, values -} diff --git a/app/vmctl/vm_native_test.go b/app/vmctl/vm_native_test.go deleted file mode 100644 index 82038e352a..0000000000 --- a/app/vmctl/vm_native_test.go +++ /dev/null @@ -1,296 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "log" - "os" - "testing" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper" - remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" -) - -const ( - storagePath = "TestStorage" - retentionPeriod = "100y" -) - -func Test_vmNativeProcessor_run(t *testing.T) { - - processFlags() - vmstorage.Init(promql.ResetRollupResultCacheIfNeeded) - defer func() { - vmstorage.Stop() - if err := os.RemoveAll(storagePath); err != nil { - log.Fatalf("cannot remove %q: %s", storagePath, err) - } - }() - - type fields struct { - filter native.Filter - dst *native.Client - src *native.Client - backoff *backoff.Backoff - s *stats - rateLimit int64 - interCluster bool - cc int - matchName string - matchValue string - } - type args struct { - ctx context.Context - silent bool - } - - tests := []struct { - name string - fields fields - args args - vmSeries func(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries - expectedSeries []vm.TimeSeries - start string - end string - numOfSamples int64 - numOfSeries int64 - chunk string - wantErr bool - }{ - { - name: "step minute on minute time range", - start: "2022-11-25T11:23:05+02:00", - end: "2022-11-27T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMinute, - fields: fields{ - filter: native.Filter{}, - backoff: backoff.New(), - rateLimit: 0, - interCluster: false, - cc: 1, - matchName: "__name__", - matchValue: ".*", - }, - args: args{ - ctx: context.Background(), - silent: true, - }, - vmSeries: remote_read_integration.GenerateVNSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1669368185000, 1669454615000}, - Values: []float64{0, 0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1669368185000, 1669454615000}, - Values: []float64{100, 100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1669368185000, 1669454615000}, - Values: []float64{200, 200}, - }, - }, - wantErr: false, - }, - { - name: "step month on month time range", - start: "2022-09-26T11:23:05+02:00", - end: "2022-11-26T11:24:05+02:00", - numOfSamples: 2, - numOfSeries: 3, - chunk: stepper.StepMonth, - fields: fields{ - filter: native.Filter{}, - backoff: backoff.New(), - rateLimit: 0, - interCluster: false, - cc: 1, - matchName: "__name__", - matchValue: ".*", - }, - args: args{ - ctx: context.Background(), - silent: true, - }, - vmSeries: remote_read_integration.GenerateVNSeries, - expectedSeries: []vm.TimeSeries{ - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{0}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{100}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1664184185000}, - Values: []float64{200}, - }, - { - Name: "vm_metric_1", - LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}}, - Timestamps: []int64{1666819415000}, - Values: []float64{200}, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - src := remote_read_integration.NewRemoteWriteServer(t) - dst := remote_read_integration.NewRemoteWriteServer(t) - - defer func() { - src.Close() - dst.Close() - }() - - start, err := time.Parse(time.RFC3339, tt.start) - if err != nil { - t.Fatalf("Error parse start time: %s", err) - } - - end, err := time.Parse(time.RFC3339, tt.end) - if err != nil { - t.Fatalf("Error parse end time: %s", err) - } - - tt.fields.filter.Match = fmt.Sprintf("%s=%q", tt.fields.matchName, tt.fields.matchValue) - tt.fields.filter.TimeStart = tt.start - tt.fields.filter.TimeEnd = tt.end - - rws := tt.vmSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples) - - src.Series(rws) - dst.ExpectedSeries(tt.expectedSeries) - - if err := fillStorage(rws); err != nil { - t.Fatalf("error add series to storage: %s", err) - } - - tt.fields.src = &native.Client{ - AuthCfg: nil, - Addr: src.URL(), - ExtraLabels: []string{}, - DisableHTTPKeepAlive: false, - } - tt.fields.dst = &native.Client{ - AuthCfg: nil, - Addr: dst.URL(), - ExtraLabels: []string{}, - DisableHTTPKeepAlive: false, - } - - p := &vmNativeProcessor{ - filter: tt.fields.filter, - dst: tt.fields.dst, - src: tt.fields.src, - backoff: tt.fields.backoff, - s: tt.fields.s, - rateLimit: tt.fields.rateLimit, - interCluster: tt.fields.interCluster, - cc: tt.fields.cc, - } - - if err := p.run(tt.args.ctx, tt.args.silent); (err != nil) != tt.wantErr { - t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr) - } - deleted, err := deleteSeries(tt.fields.matchName, tt.fields.matchValue) - if err != nil { - t.Fatalf("error delete series: %s", err) - } - if int64(deleted) != tt.numOfSeries { - t.Fatalf("expected deleted series %d; got deleted series %d", tt.numOfSeries, deleted) - } - }) - } -} - -func processFlags() { - flag.Parse() - for _, fv := range []struct { - flag string - value string - }{ - {flag: "storageDataPath", value: storagePath}, - {flag: "retentionPeriod", value: retentionPeriod}, - } { - // panics if flag doesn't exist - if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil { - log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err) - } - } -} - -func fillStorage(series []vm.TimeSeries) error { - var mrs []storage.MetricRow - for _, series := range series { - var labels []prompb.Label - for _, lp := range series.LabelPairs { - labels = append(labels, prompb.Label{Name: []byte(lp.Name), Value: []byte(lp.Value)}) - } - if series.Name != "" { - labels = append(labels, prompb.Label{Name: []byte("__name__"), Value: []byte(series.Name)}) - } - mr := storage.MetricRow{} - mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels) - - timestamps := series.Timestamps - values := series.Values - for i, value := range values { - mr.Timestamp = timestamps[i] - mr.Value = value - mrs = append(mrs, mr) - } - } - - if err := vmstorage.AddRows(mrs); err != nil { - return fmt.Errorf("unexpected error in AddRows: %s", err) - } - vmstorage.Storage.DebugFlush() - return nil -} - -func deleteSeries(name, value string) (int, error) { - tfs := storage.NewTagFilters() - if err := tfs.Add([]byte(name), []byte(value), false, true); err != nil { - return 0, fmt.Errorf("unexpected error in TagFilters.Add: %w", err) - } - return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs}) -}