diff --git a/apptest/client.go b/apptest/client.go index bf63ed0b8..00926348d 100644 --- a/apptest/client.go +++ b/apptest/client.go @@ -1,6 +1,7 @@ package apptest import ( + "bytes" "io" "net/http" "net/url" @@ -36,13 +37,13 @@ func (c *Client) CloseConnections() { // the response body to the caller. func (c *Client) Get(t *testing.T, url string, wantStatusCode int) string { t.Helper() - return c.do(t, http.MethodGet, url, "", "", wantStatusCode) + return c.do(t, http.MethodGet, url, "", nil, wantStatusCode) } // Post sends a HTTP POST request. Once the function receives a response, it // checks whether the response status code matches the expected one and returns // the response body to the caller. -func (c *Client) Post(t *testing.T, url, contentType, data string, wantStatusCode int) string { +func (c *Client) Post(t *testing.T, url, contentType string, data []byte, wantStatusCode int) string { t.Helper() return c.do(t, http.MethodPost, url, contentType, data, wantStatusCode) } @@ -52,16 +53,16 @@ func (c *Client) Post(t *testing.T, url, contentType, data string, wantStatusCod // matches the expected one and returns the response body to the caller. func (c *Client) PostForm(t *testing.T, url string, data url.Values, wantStatusCode int) string { t.Helper() - return c.Post(t, url, "application/x-www-form-urlencoded", data.Encode(), wantStatusCode) + return c.Post(t, url, "application/x-www-form-urlencoded", []byte(data.Encode()), wantStatusCode) } // do prepares a HTTP request, sends it to the server, receives the response // from the server, ensures then response code matches the expected one, reads // the rentire response body and returns it to the caller. -func (c *Client) do(t *testing.T, method, url, contentType, data string, wantStatusCode int) string { +func (c *Client) do(t *testing.T, method, url, contentType string, data []byte, wantStatusCode int) string { t.Helper() - req, err := http.NewRequest(method, url, strings.NewReader(data)) + req, err := http.NewRequest(method, url, bytes.NewReader(data)) if err != nil { t.Fatalf("could not create a HTTP request: %v", err) } diff --git a/apptest/model.go b/apptest/model.go index f7a1902d3..2477a57e3 100644 --- a/apptest/model.go +++ b/apptest/model.go @@ -8,10 +8,13 @@ import ( "strings" "testing" "time" + + pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) // PrometheusQuerier contains methods available to Prometheus-like HTTP API for Querying type PrometheusQuerier interface { + PrometheusAPIV1Export(t *testing.T, query, start, end string, opts QueryOpts) *PrometheusAPIV1QueryResponse PrometheusAPIV1Query(t *testing.T, query, time, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse PrometheusAPIV1QueryRange(t *testing.T, query, start, end, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse @@ -19,6 +22,7 @@ type PrometheusQuerier interface { // PrometheusWriter contains methods available to Prometheus-like HTTP API for Writing new data type PrometheusWriter interface { + PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, opts QueryOpts) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) } diff --git a/apptest/testcase.go b/apptest/testcase.go index f98998a67..2de70db96 100644 --- a/apptest/testcase.go +++ b/apptest/testcase.go @@ -54,6 +54,17 @@ func (tc *TestCase) Stop() { } } +// MustStartDefaultVmsingle is a test helper function that starts an instance of +// vmsingle with defaults suitable for most tests. +func (tc *TestCase) MustStartDefaultVmsingle() *Vmsingle { + tc.t.Helper() + + return tc.MustStartVmsingle("vmsingle", []string{ + "-storageDataPath=" + tc.Dir() + "/vmsingle", + "-retentionPeriod=100y", + }) +} + // MustStartVmsingle is a test helper function that starts an instance of // vmsingle and fails the test if the app fails to start. func (tc *TestCase) MustStartVmsingle(instance string, flags []string) *Vmsingle { @@ -118,7 +129,8 @@ func (c *vmcluster) ForceFlush(t *testing.T) { } } -// MustStartCluster is a typical cluster configuration. +// MustStartDefaultCluster is a typical cluster configuration suitable for most +// tests. // // The cluster consists of two vmstorages, one vminsert and one vmselect, no // data replication. @@ -128,7 +140,7 @@ func (c *vmcluster) ForceFlush(t *testing.T) { // vmselect) but instead just need a typical cluster configuration to verify // some business logic (such as API surface, or MetricsQL). Such cluster // tests usually come paired with corresponding vmsingle tests. -func (tc *TestCase) MustStartCluster() PrometheusWriteQuerier { +func (tc *TestCase) MustStartDefaultCluster() PrometheusWriteQuerier { tc.t.Helper() vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{ diff --git a/apptest/tests/key_concepts_test.go b/apptest/tests/key_concepts_test.go index 635d01ab1..8eeec4ef1 100644 --- a/apptest/tests/key_concepts_test.go +++ b/apptest/tests/key_concepts_test.go @@ -33,10 +33,7 @@ func TestSingleKeyConceptsQuery(t *testing.T) { tc := apptest.NewTestCase(t) defer tc.Stop() - sut := tc.MustStartVmsingle("vmsingle", []string{ - "-storageDataPath=" + tc.Dir() + "/vmstorage", - "-retentionPeriod=100y", - }) + sut := tc.MustStartDefaultVmsingle() testKeyConceptsQueryData(t, sut) } @@ -47,7 +44,7 @@ func TestClusterKeyConceptsQueryData(t *testing.T) { tc := apptest.NewTestCase(t) defer tc.Stop() - sut := tc.MustStartCluster() + sut := tc.MustStartDefaultCluster() testKeyConceptsQueryData(t, sut) } diff --git a/apptest/tests/metricsql_test.go b/apptest/tests/metricsql_test.go new file mode 100644 index 000000000..935ff5a2a --- /dev/null +++ b/apptest/tests/metricsql_test.go @@ -0,0 +1,119 @@ +package tests + +import ( + "fmt" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/apptest" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" + pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func millis(s string) int64 { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(fmt.Sprintf("could not parse time %q: %v", s, err)) + } + return t.UnixMilli() +} + +var staleNaNsData = func() []pb.TimeSeries { + return []pb.TimeSeries{ + { + Labels: []pb.Label{ + { + Name: "__name__", + Value: "metric", + }, + }, + Samples: []pb.Sample{ + { + Value: 1, + Timestamp: millis("2024-01-01T00:01:00Z"), + }, + { + Value: decimal.StaleNaN, + Timestamp: millis("2024-01-01T00:02:00Z"), + }, + }, + }, + } +}() + +func TestSingleInstantQueryDoesNotReturnStaleNaNs(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + + sut := tc.MustStartDefaultVmsingle() + + testInstantQueryDoesNotReturnStaleNaNs(t, sut) +} + +func TestClusterInstantQueryDoesNotReturnStaleNaNs(t *testing.T) { + tc := apptest.NewTestCase(t) + defer tc.Stop() + + sut := tc.MustStartDefaultCluster() + + testInstantQueryDoesNotReturnStaleNaNs(t, sut) +} + +func testInstantQueryDoesNotReturnStaleNaNs(t *testing.T, sut apptest.PrometheusWriteQuerier) { + opts := apptest.QueryOpts{Timeout: "5s", Tenant: "0"} + + sut.PrometheusAPIV1Write(t, staleNaNsData, opts) + sut.ForceFlush(t) + + var got, want *apptest.PrometheusAPIV1QueryResponse + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"), + cmpopts.EquateNaNs(), + } + + // Verify that instant query returns the first point. + + got = sut.PrometheusAPIV1Query(t, "metric", "2024-01-01T00:01:00.000Z", "5m", opts) + want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": [{"metric": {"__name__": "metric"}}]}}`) + want.Data.Result[0].Sample = apptest.NewSample(t, "2024-01-01T00:01:00Z", 1) + if diff := cmp.Diff(want, got, cmpOptions...); diff != "" { + t.Errorf("unexpected response (-want, +got):\n%s", diff) + } + + // Verify that instant query does not return stale NaN. + + got = sut.PrometheusAPIV1Query(t, "metric", "2024-01-01T00:02:00.000Z", "5m", opts) + want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": []}}`) + // Empty response, stale NaN is not included into response + if diff := cmp.Diff(want, got, cmpOptions...); diff != "" { + t.Errorf("unexpected response (-want, +got):\n%s", diff) + } + + // Verify that instant query with default rollup function returns stale NaN + // while it must not. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5806 + + got = sut.PrometheusAPIV1Query(t, "metric[2m]", "2024-01-01T00:02:00.000Z", "5m", opts) + want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": [{"metric": {"__name__": "metric"}, "values": []}]}}`) + s := make([]*apptest.Sample, 2) + s[0] = apptest.NewSample(t, "2024-01-01T00:01:00Z", 1) + s[1] = apptest.NewSample(t, "2024-01-01T00:02:00Z", decimal.StaleNaN) + want.Data.Result[0].Samples = s + if diff := cmp.Diff(want, got, cmpOptions...); diff != "" { + t.Errorf("unexpected response (-want, +got):\n%s", diff) + } + + // Verify that exported data contains stale NaN. + + got = sut.PrometheusAPIV1Export(t, `{__name__="metric"}`, "2024-01-01T00:01:00.000Z", "2024-01-01T00:02:00.000Z", opts) + want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": [{"metric": {"__name__": "metric"}, "values": []}]}}`) + s = make([]*apptest.Sample, 2) + s[0] = apptest.NewSample(t, "2024-01-01T00:01:00Z", 1) + s[1] = apptest.NewSample(t, "2024-01-01T00:02:00Z", decimal.StaleNaN) + want.Data.Result[0].Samples = s + if diff := cmp.Diff(want, got, cmpOptions...); diff != "" { + t.Errorf("unexpected response (-want, +got):\n%s", diff) + } +} diff --git a/apptest/vminsert.go b/apptest/vminsert.go index 93fd653db..82f291345 100644 --- a/apptest/vminsert.go +++ b/apptest/vminsert.go @@ -7,6 +7,9 @@ import ( "strings" "testing" "time" + + pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/golang/snappy" ) // Vminsert holds the state of a vminsert app and provides vminsert-specific @@ -46,6 +49,20 @@ func StartVminsert(instance string, flags []string, cli *Client) (*Vminsert, err }, nil } +// PrometheusAPIV1Write is a test helper function that inserts a +// collection of records in Prometheus remote-write format by sending a HTTP +// POST request to /prometheus/api/v1/write vminsert endpoint. +func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, opts QueryOpts) { + t.Helper() + + url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/write", app.httpListenAddr, opts.Tenant) + wr := pb.WriteRequest{Timeseries: records} + data := snappy.Encode(nil, wr.MarshalProtobuf(nil)) + app.sendBlocking(t, len(records), func() { + app.cli.Post(t, url, "application/x-protobuf", data, http.StatusNoContent) + }) +} + // PrometheusAPIV1ImportPrometheus is a test helper function that inserts a // collection of records in Prometheus text exposition format for the given // tenant by sending a HTTP POST request to @@ -56,9 +73,10 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str t.Helper() url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, opts.Tenant) - wantRowsSentCount := app.rpcRowsSentTotal(t) + len(records) - app.cli.Post(t, url, "text/plain", strings.Join(records, "\n"), http.StatusNoContent) - app.waitUntilSent(t, wantRowsSentCount) + data := []byte(strings.Join(records, "\n")) + app.sendBlocking(t, len(records), func() { + app.cli.Post(t, url, "text/plain", data, http.StatusNoContent) + }) } // String returns the string representation of the vminsert app state. @@ -66,21 +84,29 @@ func (app *Vminsert) String() string { return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr) } -// waitUntilSent waits until vminsert sends buffered data to vmstorage. +// sendBlocking sends the data to vmstorage by executing `send` function and +// waits until the data is actually sent. +// +// vminsert does not send the data immediately. It first puts the data into a +// buffer. Then a background goroutine takes the data from the buffer sends it +// to the vmstorage. This happens every 200ms. // // Waiting is implemented a retrieving the value of `vm_rpc_rows_sent_total` // metric and checking whether it is equal or greater than the wanted value. // If it is, then the data has been sent to vmstorage. // // Unreliable if the records are inserted concurrently. -func (app *Vminsert) waitUntilSent(t *testing.T, wantRowsSentCount int) { +// TODO(rtm0): Put sending and waiting into a critical section to make reliable? +func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func()) { t.Helper() + send() + const ( retries = 20 period = 100 * time.Millisecond ) - + wantRowsSentCount := app.rpcRowsSentTotal(t) + numRecordsToSend for range retries { if app.rpcRowsSentTotal(t) >= wantRowsSentCount { return diff --git a/apptest/vmselect.go b/apptest/vmselect.go index a93fc5fbe..c606b2c0a 100644 --- a/apptest/vmselect.go +++ b/apptest/vmselect.go @@ -55,9 +55,28 @@ func (app *Vmselect) ClusternativeListenAddr() string { return app.clusternativeListenAddr } +// PrometheusAPIV1Export is a test helper function that performs the export of +// raw samples in JSON line format by sending a HTTP POST request to +// /prometheus/api/v1/export vmselect endpoint. +// +// See https://docs.victoriametrics.com/url-examples/#apiv1export +func (app *Vmselect) PrometheusAPIV1Export(t *testing.T, query, start, end string, opts QueryOpts) *PrometheusAPIV1QueryResponse { + t.Helper() + + exportURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/export", app.httpListenAddr, opts.Tenant) + values := url.Values{} + values.Add("match[]", query) + values.Add("start", start) + values.Add("end", end) + values.Add("timeout", opts.Timeout) + values.Add("format", "promapi") + res := app.cli.PostForm(t, exportURL, values, http.StatusOK) + return NewPrometheusAPIV1QueryResponse(t, res) +} + // PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL // instant query by sending a HTTP POST request to /prometheus/api/v1/query -// vmsingle endpoint. +// vmselect endpoint. // // See https://docs.victoriametrics.com/url-examples/#apiv1query func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query, time, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse { @@ -75,7 +94,7 @@ func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query, time, step string // PrometheusAPIV1QueryRange is a test helper function that performs // PromQL/MetricsQL range query by sending a HTTP POST request to -// /prometheus/api/v1/query_range vmsingle endpoint. +// /prometheus/api/v1/query_range vmselect endpoint. // // See https://docs.victoriametrics.com/url-examples/#apiv1query_range func (app *Vmselect) PrometheusAPIV1QueryRange(t *testing.T, query, start, end, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse { diff --git a/apptest/vmsingle.go b/apptest/vmsingle.go index 03c5a5aa3..c1aa77b40 100644 --- a/apptest/vmsingle.go +++ b/apptest/vmsingle.go @@ -9,6 +9,9 @@ import ( "strings" "testing" "time" + + pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/golang/snappy" ) // Vmsingle holds the state of a vmsingle app and provides vmsingle-specific @@ -20,11 +23,18 @@ type Vmsingle struct { storageDataPath string httpListenAddr string - forceFlushURL string + // vmstorage URLs. + forceFlushURL string + + // vminsert URLs. prometheusAPIV1ImportPrometheusURL string - prometheusAPIV1QueryURL string - prometheusAPIV1QueryRangeURL string - prometheusAPIV1SeriesURL string + prometheusAPIV1WriteURL string + + // vmselect URLs. + prometheusAPIV1ExportURL string + prometheusAPIV1QueryURL string + prometheusAPIV1QueryRangeURL string + prometheusAPIV1SeriesURL string } // StartVmsingle starts an instance of vmsingle with the given flags. It also @@ -56,6 +66,8 @@ func StartVmsingle(instance string, flags []string, cli *Client) (*Vmsingle, err forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]), prometheusAPIV1ImportPrometheusURL: fmt.Sprintf("http://%s/prometheus/api/v1/import/prometheus", stderrExtracts[1]), + prometheusAPIV1WriteURL: fmt.Sprintf("http://%s/prometheus/api/v1/write", stderrExtracts[1]), + prometheusAPIV1ExportURL: fmt.Sprintf("http://%s/prometheus/api/v1/export", stderrExtracts[1]), prometheusAPIV1QueryURL: fmt.Sprintf("http://%s/prometheus/api/v1/query", stderrExtracts[1]), prometheusAPIV1QueryRangeURL: fmt.Sprintf("http://%s/prometheus/api/v1/query_range", stderrExtracts[1]), prometheusAPIV1SeriesURL: fmt.Sprintf("http://%s/prometheus/api/v1/series", stderrExtracts[1]), @@ -70,6 +82,17 @@ func (app *Vmsingle) ForceFlush(t *testing.T) { app.cli.Get(t, app.forceFlushURL, http.StatusOK) } +// PrometheusAPIV1Write is a test helper function that inserts a +// collection of records in Prometheus remote-write format by sending a HTTP +// POST request to /prometheus/api/v1/write vmsingle endpoint. +func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, _ QueryOpts) { + t.Helper() + + wr := pb.WriteRequest{Timeseries: records} + data := snappy.Encode(nil, wr.MarshalProtobuf(nil)) + app.cli.Post(t, app.prometheusAPIV1WriteURL, "application/x-protobuf", data, http.StatusNoContent) +} + // PrometheusAPIV1ImportPrometheus is a test helper function that inserts a // collection of records in Prometheus text exposition format by sending a HTTP // POST request to /prometheus/api/v1/import/prometheus vmsingle endpoint. @@ -78,7 +101,26 @@ func (app *Vmsingle) ForceFlush(t *testing.T) { func (app *Vmsingle) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, _ QueryOpts) { t.Helper() - app.cli.Post(t, app.prometheusAPIV1ImportPrometheusURL, "text/plain", strings.Join(records, "\n"), http.StatusNoContent) + data := []byte(strings.Join(records, "\n")) + app.cli.Post(t, app.prometheusAPIV1ImportPrometheusURL, "text/plain", data, http.StatusNoContent) +} + +// PrometheusAPIV1Export is a test helper function that performs the export of +// raw samples in JSON line format by sending a HTTP POST request to +// /prometheus/api/v1/export vmsingle endpoint. +// +// See https://docs.victoriametrics.com/url-examples/#apiv1export +func (app *Vmsingle) PrometheusAPIV1Export(t *testing.T, query, start, end string, opts QueryOpts) *PrometheusAPIV1QueryResponse { + t.Helper() + + values := url.Values{} + values.Add("match[]", query) + values.Add("start", start) + values.Add("end", end) + values.Add("timeout", opts.Timeout) + values.Add("format", "promapi") + res := app.cli.PostForm(t, app.prometheusAPIV1ExportURL, values, http.StatusOK) + return NewPrometheusAPIV1QueryResponse(t, res) } // PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL