apptest: add tests for stale nans in instant query (#7621)

### Describe Your Changes

These are the integration tests that confirm that instant queries may
return stale NaNs when the query contains a rollup function.

The bug was reported at #5806. There is also a fix: #7275. The tests in
this PR will be used co confirm that the fix works.

Some test refactoring has been done along the way. Sorry, couldn't
resist.

### Checklist

The following checks are **mandatory**:

- [x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
Artem Fetishev 2024-11-21 19:39:17 +01:00 committed by f41gh7
parent 53c2214940
commit d238011327
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
8 changed files with 245 additions and 25 deletions

View file

@ -1,6 +1,7 @@
package apptest
import (
"bytes"
"io"
"net/http"
"net/url"
@ -36,13 +37,13 @@ func (c *Client) CloseConnections() {
// the response body to the caller.
func (c *Client) Get(t *testing.T, url string, wantStatusCode int) string {
t.Helper()
return c.do(t, http.MethodGet, url, "", "", wantStatusCode)
return c.do(t, http.MethodGet, url, "", nil, wantStatusCode)
}
// Post sends a HTTP POST request. Once the function receives a response, it
// checks whether the response status code matches the expected one and returns
// the response body to the caller.
func (c *Client) Post(t *testing.T, url, contentType, data string, wantStatusCode int) string {
func (c *Client) Post(t *testing.T, url, contentType string, data []byte, wantStatusCode int) string {
t.Helper()
return c.do(t, http.MethodPost, url, contentType, data, wantStatusCode)
}
@ -52,16 +53,16 @@ func (c *Client) Post(t *testing.T, url, contentType, data string, wantStatusCod
// matches the expected one and returns the response body to the caller.
func (c *Client) PostForm(t *testing.T, url string, data url.Values, wantStatusCode int) string {
t.Helper()
return c.Post(t, url, "application/x-www-form-urlencoded", data.Encode(), wantStatusCode)
return c.Post(t, url, "application/x-www-form-urlencoded", []byte(data.Encode()), wantStatusCode)
}
// do prepares a HTTP request, sends it to the server, receives the response
// from the server, ensures then response code matches the expected one, reads
// the rentire response body and returns it to the caller.
func (c *Client) do(t *testing.T, method, url, contentType, data string, wantStatusCode int) string {
func (c *Client) do(t *testing.T, method, url, contentType string, data []byte, wantStatusCode int) string {
t.Helper()
req, err := http.NewRequest(method, url, strings.NewReader(data))
req, err := http.NewRequest(method, url, bytes.NewReader(data))
if err != nil {
t.Fatalf("could not create a HTTP request: %v", err)
}

View file

@ -8,10 +8,13 @@ import (
"strings"
"testing"
"time"
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// PrometheusQuerier contains methods available to Prometheus-like HTTP API for Querying
type PrometheusQuerier interface {
PrometheusAPIV1Export(t *testing.T, query, start, end string, opts QueryOpts) *PrometheusAPIV1QueryResponse
PrometheusAPIV1Query(t *testing.T, query, time, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse
PrometheusAPIV1QueryRange(t *testing.T, query, start, end, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse
PrometheusAPIV1Series(t *testing.T, matchQuery string, opts QueryOpts) *PrometheusAPIV1SeriesResponse
@ -19,6 +22,7 @@ type PrometheusQuerier interface {
// PrometheusWriter contains methods available to Prometheus-like HTTP API for Writing new data
type PrometheusWriter interface {
PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, opts QueryOpts)
PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts)
}

View file

@ -54,6 +54,17 @@ func (tc *TestCase) Stop() {
}
}
// MustStartDefaultVmsingle is a test helper function that starts an instance of
// vmsingle with defaults suitable for most tests.
func (tc *TestCase) MustStartDefaultVmsingle() *Vmsingle {
tc.t.Helper()
return tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + tc.Dir() + "/vmsingle",
"-retentionPeriod=100y",
})
}
// MustStartVmsingle is a test helper function that starts an instance of
// vmsingle and fails the test if the app fails to start.
func (tc *TestCase) MustStartVmsingle(instance string, flags []string) *Vmsingle {
@ -118,7 +129,8 @@ func (c *vmcluster) ForceFlush(t *testing.T) {
}
}
// MustStartCluster is a typical cluster configuration.
// MustStartDefaultCluster is a typical cluster configuration suitable for most
// tests.
//
// The cluster consists of two vmstorages, one vminsert and one vmselect, no
// data replication.
@ -128,7 +140,7 @@ func (c *vmcluster) ForceFlush(t *testing.T) {
// vmselect) but instead just need a typical cluster configuration to verify
// some business logic (such as API surface, or MetricsQL). Such cluster
// tests usually come paired with corresponding vmsingle tests.
func (tc *TestCase) MustStartCluster() PrometheusWriteQuerier {
func (tc *TestCase) MustStartDefaultCluster() PrometheusWriteQuerier {
tc.t.Helper()
vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{

View file

@ -33,10 +33,7 @@ func TestSingleKeyConceptsQuery(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage",
"-retentionPeriod=100y",
})
sut := tc.MustStartDefaultVmsingle()
testKeyConceptsQueryData(t, sut)
}
@ -47,7 +44,7 @@ func TestClusterKeyConceptsQueryData(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartCluster()
sut := tc.MustStartDefaultCluster()
testKeyConceptsQueryData(t, sut)
}

View file

@ -0,0 +1,119 @@
package tests
import (
"fmt"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
func millis(s string) int64 {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
panic(fmt.Sprintf("could not parse time %q: %v", s, err))
}
return t.UnixMilli()
}
var staleNaNsData = func() []pb.TimeSeries {
return []pb.TimeSeries{
{
Labels: []pb.Label{
{
Name: "__name__",
Value: "metric",
},
},
Samples: []pb.Sample{
{
Value: 1,
Timestamp: millis("2024-01-01T00:01:00Z"),
},
{
Value: decimal.StaleNaN,
Timestamp: millis("2024-01-01T00:02:00Z"),
},
},
},
}
}()
func TestSingleInstantQueryDoesNotReturnStaleNaNs(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartDefaultVmsingle()
testInstantQueryDoesNotReturnStaleNaNs(t, sut)
}
func TestClusterInstantQueryDoesNotReturnStaleNaNs(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
sut := tc.MustStartDefaultCluster()
testInstantQueryDoesNotReturnStaleNaNs(t, sut)
}
func testInstantQueryDoesNotReturnStaleNaNs(t *testing.T, sut apptest.PrometheusWriteQuerier) {
opts := apptest.QueryOpts{Timeout: "5s", Tenant: "0"}
sut.PrometheusAPIV1Write(t, staleNaNsData, opts)
sut.ForceFlush(t)
var got, want *apptest.PrometheusAPIV1QueryResponse
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(apptest.PrometheusAPIV1QueryResponse{}, "Status", "Data.ResultType"),
cmpopts.EquateNaNs(),
}
// Verify that instant query returns the first point.
got = sut.PrometheusAPIV1Query(t, "metric", "2024-01-01T00:01:00.000Z", "5m", opts)
want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": [{"metric": {"__name__": "metric"}}]}}`)
want.Data.Result[0].Sample = apptest.NewSample(t, "2024-01-01T00:01:00Z", 1)
if diff := cmp.Diff(want, got, cmpOptions...); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// Verify that instant query does not return stale NaN.
got = sut.PrometheusAPIV1Query(t, "metric", "2024-01-01T00:02:00.000Z", "5m", opts)
want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": []}}`)
// Empty response, stale NaN is not included into response
if diff := cmp.Diff(want, got, cmpOptions...); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// Verify that instant query with default rollup function returns stale NaN
// while it must not.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5806
got = sut.PrometheusAPIV1Query(t, "metric[2m]", "2024-01-01T00:02:00.000Z", "5m", opts)
want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": [{"metric": {"__name__": "metric"}, "values": []}]}}`)
s := make([]*apptest.Sample, 2)
s[0] = apptest.NewSample(t, "2024-01-01T00:01:00Z", 1)
s[1] = apptest.NewSample(t, "2024-01-01T00:02:00Z", decimal.StaleNaN)
want.Data.Result[0].Samples = s
if diff := cmp.Diff(want, got, cmpOptions...); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
// Verify that exported data contains stale NaN.
got = sut.PrometheusAPIV1Export(t, `{__name__="metric"}`, "2024-01-01T00:01:00.000Z", "2024-01-01T00:02:00.000Z", opts)
want = apptest.NewPrometheusAPIV1QueryResponse(t, `{"data": {"result": [{"metric": {"__name__": "metric"}, "values": []}]}}`)
s = make([]*apptest.Sample, 2)
s[0] = apptest.NewSample(t, "2024-01-01T00:01:00Z", 1)
s[1] = apptest.NewSample(t, "2024-01-01T00:02:00Z", decimal.StaleNaN)
want.Data.Result[0].Samples = s
if diff := cmp.Diff(want, got, cmpOptions...); diff != "" {
t.Errorf("unexpected response (-want, +got):\n%s", diff)
}
}

View file

@ -7,6 +7,9 @@ import (
"strings"
"testing"
"time"
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/golang/snappy"
)
// Vminsert holds the state of a vminsert app and provides vminsert-specific
@ -46,6 +49,20 @@ func StartVminsert(instance string, flags []string, cli *Client) (*Vminsert, err
}, nil
}
// PrometheusAPIV1Write is a test helper function that inserts a
// collection of records in Prometheus remote-write format by sending a HTTP
// POST request to /prometheus/api/v1/write vminsert endpoint.
func (app *Vminsert) PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, opts QueryOpts) {
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/write", app.httpListenAddr, opts.Tenant)
wr := pb.WriteRequest{Timeseries: records}
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
app.sendBlocking(t, len(records), func() {
app.cli.Post(t, url, "application/x-protobuf", data, http.StatusNoContent)
})
}
// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a
// collection of records in Prometheus text exposition format for the given
// tenant by sending a HTTP POST request to
@ -56,9 +73,10 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str
t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, opts.Tenant)
wantRowsSentCount := app.rpcRowsSentTotal(t) + len(records)
app.cli.Post(t, url, "text/plain", strings.Join(records, "\n"), http.StatusNoContent)
app.waitUntilSent(t, wantRowsSentCount)
data := []byte(strings.Join(records, "\n"))
app.sendBlocking(t, len(records), func() {
app.cli.Post(t, url, "text/plain", data, http.StatusNoContent)
})
}
// String returns the string representation of the vminsert app state.
@ -66,21 +84,29 @@ func (app *Vminsert) String() string {
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)
}
// waitUntilSent waits until vminsert sends buffered data to vmstorage.
// sendBlocking sends the data to vmstorage by executing `send` function and
// waits until the data is actually sent.
//
// vminsert does not send the data immediately. It first puts the data into a
// buffer. Then a background goroutine takes the data from the buffer sends it
// to the vmstorage. This happens every 200ms.
//
// Waiting is implemented a retrieving the value of `vm_rpc_rows_sent_total`
// metric and checking whether it is equal or greater than the wanted value.
// If it is, then the data has been sent to vmstorage.
//
// Unreliable if the records are inserted concurrently.
func (app *Vminsert) waitUntilSent(t *testing.T, wantRowsSentCount int) {
// TODO(rtm0): Put sending and waiting into a critical section to make reliable?
func (app *Vminsert) sendBlocking(t *testing.T, numRecordsToSend int, send func()) {
t.Helper()
send()
const (
retries = 20
period = 100 * time.Millisecond
)
wantRowsSentCount := app.rpcRowsSentTotal(t) + numRecordsToSend
for range retries {
if app.rpcRowsSentTotal(t) >= wantRowsSentCount {
return

View file

@ -55,9 +55,28 @@ func (app *Vmselect) ClusternativeListenAddr() string {
return app.clusternativeListenAddr
}
// PrometheusAPIV1Export is a test helper function that performs the export of
// raw samples in JSON line format by sending a HTTP POST request to
// /prometheus/api/v1/export vmselect endpoint.
//
// See https://docs.victoriametrics.com/url-examples/#apiv1export
func (app *Vmselect) PrometheusAPIV1Export(t *testing.T, query, start, end string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
t.Helper()
exportURL := fmt.Sprintf("http://%s/select/%s/prometheus/api/v1/export", app.httpListenAddr, opts.Tenant)
values := url.Values{}
values.Add("match[]", query)
values.Add("start", start)
values.Add("end", end)
values.Add("timeout", opts.Timeout)
values.Add("format", "promapi")
res := app.cli.PostForm(t, exportURL, values, http.StatusOK)
return NewPrometheusAPIV1QueryResponse(t, res)
}
// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL
// instant query by sending a HTTP POST request to /prometheus/api/v1/query
// vmsingle endpoint.
// vmselect endpoint.
//
// See https://docs.victoriametrics.com/url-examples/#apiv1query
func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query, time, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
@ -75,7 +94,7 @@ func (app *Vmselect) PrometheusAPIV1Query(t *testing.T, query, time, step string
// PrometheusAPIV1QueryRange is a test helper function that performs
// PromQL/MetricsQL range query by sending a HTTP POST request to
// /prometheus/api/v1/query_range vmsingle endpoint.
// /prometheus/api/v1/query_range vmselect endpoint.
//
// See https://docs.victoriametrics.com/url-examples/#apiv1query_range
func (app *Vmselect) PrometheusAPIV1QueryRange(t *testing.T, query, start, end, step string, opts QueryOpts) *PrometheusAPIV1QueryResponse {

View file

@ -9,6 +9,9 @@ import (
"strings"
"testing"
"time"
pb "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/golang/snappy"
)
// Vmsingle holds the state of a vmsingle app and provides vmsingle-specific
@ -20,8 +23,15 @@ type Vmsingle struct {
storageDataPath string
httpListenAddr string
// vmstorage URLs.
forceFlushURL string
// vminsert URLs.
prometheusAPIV1ImportPrometheusURL string
prometheusAPIV1WriteURL string
// vmselect URLs.
prometheusAPIV1ExportURL string
prometheusAPIV1QueryURL string
prometheusAPIV1QueryRangeURL string
prometheusAPIV1SeriesURL string
@ -56,6 +66,8 @@ func StartVmsingle(instance string, flags []string, cli *Client) (*Vmsingle, err
forceFlushURL: fmt.Sprintf("http://%s/internal/force_flush", stderrExtracts[1]),
prometheusAPIV1ImportPrometheusURL: fmt.Sprintf("http://%s/prometheus/api/v1/import/prometheus", stderrExtracts[1]),
prometheusAPIV1WriteURL: fmt.Sprintf("http://%s/prometheus/api/v1/write", stderrExtracts[1]),
prometheusAPIV1ExportURL: fmt.Sprintf("http://%s/prometheus/api/v1/export", stderrExtracts[1]),
prometheusAPIV1QueryURL: fmt.Sprintf("http://%s/prometheus/api/v1/query", stderrExtracts[1]),
prometheusAPIV1QueryRangeURL: fmt.Sprintf("http://%s/prometheus/api/v1/query_range", stderrExtracts[1]),
prometheusAPIV1SeriesURL: fmt.Sprintf("http://%s/prometheus/api/v1/series", stderrExtracts[1]),
@ -70,6 +82,17 @@ func (app *Vmsingle) ForceFlush(t *testing.T) {
app.cli.Get(t, app.forceFlushURL, http.StatusOK)
}
// PrometheusAPIV1Write is a test helper function that inserts a
// collection of records in Prometheus remote-write format by sending a HTTP
// POST request to /prometheus/api/v1/write vmsingle endpoint.
func (app *Vmsingle) PrometheusAPIV1Write(t *testing.T, records []pb.TimeSeries, _ QueryOpts) {
t.Helper()
wr := pb.WriteRequest{Timeseries: records}
data := snappy.Encode(nil, wr.MarshalProtobuf(nil))
app.cli.Post(t, app.prometheusAPIV1WriteURL, "application/x-protobuf", data, http.StatusNoContent)
}
// PrometheusAPIV1ImportPrometheus is a test helper function that inserts a
// collection of records in Prometheus text exposition format by sending a HTTP
// POST request to /prometheus/api/v1/import/prometheus vmsingle endpoint.
@ -78,7 +101,26 @@ func (app *Vmsingle) ForceFlush(t *testing.T) {
func (app *Vmsingle) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, _ QueryOpts) {
t.Helper()
app.cli.Post(t, app.prometheusAPIV1ImportPrometheusURL, "text/plain", strings.Join(records, "\n"), http.StatusNoContent)
data := []byte(strings.Join(records, "\n"))
app.cli.Post(t, app.prometheusAPIV1ImportPrometheusURL, "text/plain", data, http.StatusNoContent)
}
// PrometheusAPIV1Export is a test helper function that performs the export of
// raw samples in JSON line format by sending a HTTP POST request to
// /prometheus/api/v1/export vmsingle endpoint.
//
// See https://docs.victoriametrics.com/url-examples/#apiv1export
func (app *Vmsingle) PrometheusAPIV1Export(t *testing.T, query, start, end string, opts QueryOpts) *PrometheusAPIV1QueryResponse {
t.Helper()
values := url.Values{}
values.Add("match[]", query)
values.Add("start", start)
values.Add("end", end)
values.Add("timeout", opts.Timeout)
values.Add("format", "promapi")
res := app.cli.PostForm(t, app.prometheusAPIV1ExportURL, values, http.StatusOK)
return NewPrometheusAPIV1QueryResponse(t, res)
}
// PrometheusAPIV1Query is a test helper function that performs PromQL/MetricsQL