apptest: typical cluster configuration for business logic tests

Add the ability to create a simple cluster configuration for tests that
do not verify the cluster-specific behavior but instead are focused on
the business logic tests, such as API surface or MetricsQL. For such
tests this cluster configuration will be enough in most cases.

Cluster-specific tests should continue creating custom configurations.

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
This commit is contained in:
Artem Fetishev 2024-11-20 16:30:55 +01:00 committed by f41gh7
parent 92e2e6f951
commit f79406a051
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
9 changed files with 322 additions and 85 deletions

View file

@ -91,3 +91,32 @@ jobs:
uses: codecov/codecov-action@v4 uses: codecov/codecov-action@v4
with: with:
file: ./coverage.txt file: ./coverage.txt
integration-test:
name: integration-test
needs: [lint, test]
runs-on: ubuntu-latest
steps:
- name: Code checkout
uses: actions/checkout@v4
- name: Setup Go
id: go
uses: actions/setup-go@v5
with:
cache: false
go-version: stable
- name: Cache Go artifacts
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/bin
~/go/pkg/mod
key: go-artifacts-${{ runner.os }}-${{ matrix.scenario }}-${{ steps.go.outputs.go-version }}-${{ hashFiles('go.sum', 'Makefile', 'app/**/Makefile') }}
restore-keys: go-artifacts-${{ runner.os }}-${{ matrix.scenario }}-
- name: Run integration tests
run: make integration-test

View file

@ -219,7 +219,7 @@ test-full-386:
DISABLE_FSYNC_FOR_TESTING=1 GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/... DISABLE_FSYNC_FOR_TESTING=1 GOARCH=386 go test -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
integration-test: all integration-test: all
go test ./apptest/... -skip="^TestSingle.*" go test ./apptest/... -skip="^TestSingle.*" -v
benchmark: benchmark:
go test -bench=. ./lib/... go test -bench=. ./lib/...

View file

@ -128,3 +128,31 @@ func (app *ServesMetrics) GetMetric(t *testing.T, metricName string) float64 {
t.Fatalf("metic not found: %s", metricName) t.Fatalf("metic not found: %s", metricName)
return 0 return 0
} }
// GetMetricsByPrefix retrieves the values of all metrics that start with given
// prefix.
func (app *ServesMetrics) GetMetricsByPrefix(t *testing.T, prefix string) []float64 {
t.Helper()
values := []float64{}
metrics := app.cli.Get(t, app.metricsURL, http.StatusOK)
for _, metric := range strings.Split(metrics, "\n") {
if !strings.HasPrefix(metric, prefix) {
continue
}
parts := strings.Split(metric, " ")
if len(parts) < 2 {
t.Fatalf("unexpected record format: got %q, want metric name and value separated by a space", metric)
}
value, err := strconv.ParseFloat(parts[len(parts)-1], 64)
if err != nil {
t.Fatalf("could not parse metric value %s: %v", metric, err)
}
values = append(values, value)
}
return values
}

View file

@ -3,7 +3,9 @@ package apptest
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"slices"
"strconv" "strconv"
"strings"
"testing" "testing"
"time" "time"
) )
@ -20,6 +22,20 @@ type PrometheusWriter interface {
PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts) PrometheusAPIV1ImportPrometheus(t *testing.T, records []string, opts QueryOpts)
} }
// StorageFlusher defines a method that forces the flushing of data inserted
// into the storage, so it becomes available for searching immediately.
type StorageFlusher interface {
ForceFlush(t *testing.T)
}
// PrometheusWriteQuerier encompasses the methods for writing, flushing and
// querying the data.
type PrometheusWriteQuerier interface {
PrometheusWriter
PrometheusQuerier
StorageFlusher
}
// QueryOpts contains various params used for querying or ingesting data // QueryOpts contains various params used for querying or ingesting data
type QueryOpts struct { type QueryOpts struct {
Tenant string Tenant string
@ -119,3 +135,19 @@ func NewPrometheusAPIV1SeriesResponse(t *testing.T, s string) *PrometheusAPIV1Se
} }
return res return res
} }
// Sort sorts the response data.
func (r *PrometheusAPIV1SeriesResponse) Sort() {
str := func(m map[string]string) string {
s := []string{}
for k, v := range m {
s = append(s, k+v)
}
slices.Sort(s)
return strings.Join(s, "")
}
slices.SortFunc(r.Data, func(a, b map[string]string) int {
return strings.Compare(str(a), str(b))
})
}

View file

@ -1,9 +1,12 @@
package apptest package apptest
import ( import (
"fmt"
"testing" "testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/google/go-cmp/cmp"
) )
// TestCase holds the state and defines clean-up procedure common for all test // TestCase holds the state and defines clean-up procedure common for all test
@ -103,6 +106,124 @@ func (tc *TestCase) MustStartVminsert(instance string, flags []string) *Vminsert
return app return app
} }
type vmcluster struct {
*Vminsert
*Vmselect
vmstorages []*Vmstorage
}
func (c *vmcluster) ForceFlush(t *testing.T) {
for _, s := range c.vmstorages {
s.ForceFlush(t)
}
}
// MustStartCluster is a typical cluster configuration.
//
// The cluster consists of two vmstorages, one vminsert and one vmselect, no
// data replication.
//
// Such configuration is suitable for tests that don't verify the
// cluster-specific behavior (such as sharding, replication, or multilevel
// vmselect) but instead just need a typical cluster configuration to verify
// some business logic (such as API surface, or MetricsQL). Such cluster
// tests usually come paired with corresponding vmsingle tests.
func (tc *TestCase) MustStartCluster() PrometheusWriteQuerier {
tc.t.Helper()
vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage-1",
"-retentionPeriod=100y",
})
vmstorage2 := tc.MustStartVmstorage("vmstorage-2", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage-2",
"-retentionPeriod=100y",
})
vminsert := tc.MustStartVminsert("vminsert", []string{
"-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(),
})
return &vmcluster{vminsert, vmselect, []*Vmstorage{vmstorage1, vmstorage2}}
}
func (tc *TestCase) addApp(app Stopper) { func (tc *TestCase) addApp(app Stopper) {
tc.startedApps = append(tc.startedApps, app) tc.startedApps = append(tc.startedApps, app)
} }
// AssertOptions hold the assertion params, such as got and wanted values as
// well as the message that should be included into the assertion error message
// in case of failure.
//
// In VictoriaMetrics (especially the cluster version) the inserted data does
// not become visible for querying right away. Therefore, the first comparisons
// may fail. AssertOptions allow to configure how many times the actual result
// must be retrieved and compared with the expected one and for long to wait
// between the retries. If these two params (`Retries` and `Period`) are not
// set, the default values will be used.
//
// If it is known that the data is available, then the retry functionality can
// be disabled by setting the `DoNotRetry` field.
//
// AssertOptions are used by the TestCase.Assert() method, and this method uses
// cmp.Diff() from go-cmp package for comparing got and wanted values.
// AssertOptions, therefore, allows to pass cmp.Options to cmp.Diff() via
// `CmpOpts` field.
//
// Finally the `FailNow` field controls whether the assertion should fail using
// `testing.T.Errorf()` or `testing.T.Fatalf()`.
type AssertOptions struct {
Msg string
Got func() any
Want any
CmpOpts []cmp.Option
DoNotRetry bool
Retries int
Period time.Duration
FailNow bool
}
// Assert compares the actual result with the expected one possibly multiple
// times in order to account for the fact that the inserted data does not become
// available for querying right away (especially in cluster version of
// VictoriaMetrics).
func (tc *TestCase) Assert(opts *AssertOptions) {
tc.t.Helper()
const (
defaultRetries = 20
defaultPeriod = 100 * time.Millisecond
)
if opts.DoNotRetry {
opts.Retries = 1
opts.Period = 0
} else {
if opts.Retries <= 0 {
opts.Retries = defaultRetries
}
if opts.Period <= 0 {
opts.Period = defaultPeriod
}
}
var diff string
for range opts.Retries {
diff = cmp.Diff(opts.Want, opts.Got(), opts.CmpOpts...)
if diff == "" {
return
}
time.Sleep(opts.Period)
}
msg := fmt.Sprintf("%s (-want, +got):\n%s", opts.Msg, diff)
if opts.FailNow {
tc.t.Fatal(msg)
} else {
tc.t.Error(msg)
}
}

View file

@ -2,7 +2,6 @@ package tests
import ( import (
"testing" "testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest" "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
@ -29,67 +28,41 @@ var docData = []string{
} }
// TestSingleKeyConceptsQuery verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data // TestSingleKeyConceptsQuery verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data
// for vm-single.
func TestSingleKeyConceptsQuery(t *testing.T) { func TestSingleKeyConceptsQuery(t *testing.T) {
tc := apptest.NewTestCase(t) tc := apptest.NewTestCase(t)
defer tc.Stop() defer tc.Stop()
vmsingle := tc.MustStartVmsingle("vmsingle", []string{ sut := tc.MustStartVmsingle("vmsingle", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage", "-storageDataPath=" + tc.Dir() + "/vmstorage",
"-retentionPeriod=100y", "-retentionPeriod=100y",
}) })
opts := apptest.QueryOpts{Timeout: "5s"} testKeyConceptsQueryData(t, sut)
// Insert example data from documentation.
vmsingle.PrometheusAPIV1ImportPrometheus(t, docData, opts)
vmsingle.ForceFlush(t)
testInstantQuery(t, vmsingle, opts)
testRangeQuery(t, vmsingle, opts)
testRangeQueryIsEquivalentToManyInstantQueries(t, vmsingle, opts)
} }
// TestClusterKeyConceptsQuery verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data // TestClusterKeyConceptsQueryData verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data
func TestClusterKeyConceptsQuery(t *testing.T) { // for vm-cluster.
func TestClusterKeyConceptsQueryData(t *testing.T) {
tc := apptest.NewTestCase(t) tc := apptest.NewTestCase(t)
defer tc.Stop() defer tc.Stop()
// Set up the following cluster configuration: sut := tc.MustStartCluster()
//
// - two vmstorage instances
// - vminsert points to the two vmstorages, its replication setting
// is off which means it will only shard the incoming data across the two
// vmstorages.
// - vmselect points to the two vmstorages and is expected to query both
// vmstorages and build the full result out of the two partial results.
vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{ testKeyConceptsQueryData(t, sut)
"-storageDataPath=" + tc.Dir() + "/vmstorage-1", }
"-retentionPeriod=100y",
})
vmstorage2 := tc.MustStartVmstorage("vmstorage-2", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage-2",
"-retentionPeriod=100y",
})
vminsert := tc.MustStartVminsert("vminsert", []string{
"-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(),
})
vmselect := tc.MustStartVmselect("vmselect", []string{
"-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(),
})
// testClusterKeyConceptsQuery verifies cases from https://docs.victoriametrics.com/keyconcepts/#query-data
func testKeyConceptsQueryData(t *testing.T, sut apptest.PrometheusWriteQuerier) {
opts := apptest.QueryOpts{Timeout: "5s", Tenant: "0"} opts := apptest.QueryOpts{Timeout: "5s", Tenant: "0"}
// Insert example data from documentation. // Insert example data from documentation.
vminsert.PrometheusAPIV1ImportPrometheus(t, docData, opts) sut.PrometheusAPIV1ImportPrometheus(t, docData, opts)
time.Sleep(2 * time.Second) sut.ForceFlush(t)
vmstorage1.ForceFlush(t) testInstantQuery(t, sut, opts)
vmstorage2.ForceFlush(t) testRangeQuery(t, sut, opts)
testRangeQueryIsEquivalentToManyInstantQueries(t, sut, opts)
testInstantQuery(t, vmselect, opts)
testRangeQuery(t, vmselect, opts)
testRangeQueryIsEquivalentToManyInstantQueries(t, vmselect, opts)
} }
// testInstantQuery verifies the statements made in the `Instant query` section // testInstantQuery verifies the statements made in the `Instant query` section

View file

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"math/rand/v2" "math/rand/v2"
"testing" "testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest" "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
) )
@ -33,30 +32,41 @@ func TestClusterMultilevelSelect(t *testing.T) {
"-storageNode=" + vmselectL1.ClusternativeListenAddr(), "-storageNode=" + vmselectL1.ClusternativeListenAddr(),
}) })
// Insert 1000 unique time series.Wait for 2 seconds to let vmstorage // Insert 1000 unique time series.
// flush pending items so they become searchable.
const numMetrics = 1000 const numMetrics = 1000
records := make([]string, numMetrics) records := make([]string, numMetrics)
want := &apptest.PrometheusAPIV1SeriesResponse{
Status: "success",
IsPartial: false,
Data: make([]map[string]string, numMetrics),
}
for i := range numMetrics { for i := range numMetrics {
records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000)) name := fmt.Sprintf("metric_%d", i)
records[i] = fmt.Sprintf("%s %d", name, rand.IntN(1000))
want.Data[i] = map[string]string{"__name__": name}
} }
vminsert.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{Tenant: "0"}) want.Sort()
time.Sleep(2 * time.Second) qopts := apptest.QueryOpts{Tenant: "0"}
vminsert.PrometheusAPIV1ImportPrometheus(t, records, qopts)
vmstorage.ForceFlush(t)
// Retrieve all time series and verify that vmselect (L1) serves the complete // Retrieve all time series and verify that both vmselect (L1) and
// set of time series. // vmselect (L2) serve the complete set of time series.
seriesL1 := vmselectL1.PrometheusAPIV1Series(t, `{__name__=~".*"}`, apptest.QueryOpts{Tenant: "0"}) got := func(app *apptest.Vmselect) any {
if got, want := len(seriesL1.Data), numMetrics; got != want { res := app.PrometheusAPIV1Series(t, `{__name__=~".*"}`, qopts)
t.Fatalf("unexpected level-1 series count: got %d, want %d", got, want) res.Sort()
} return res
// Retrieve all time series and verify that vmselect (L2) serves the complete
// set of time series.
seriesL2 := vmselectL2.PrometheusAPIV1Series(t, `{__name__=~".*"}`, apptest.QueryOpts{Tenant: "0"})
if got, want := len(seriesL2.Data), numMetrics; got != want {
t.Fatalf("unexpected level-2 series count: got %d, want %d", got, want)
} }
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected level-1 series count",
Got: func() any { return got(vmselectL1) },
Want: want,
})
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected level-2 series count",
Got: func() any { return got(vmselectL2) },
Want: want,
})
} }

View file

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"math/rand/v2" "math/rand/v2"
"testing" "testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest" "github.com/VictoriaMetrics/VictoriaMetrics/apptest"
) )
@ -35,20 +34,28 @@ func TestClusterVminsertShardsDataVmselectBuildsFullResultFromShards(t *testing.
"-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(), "-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(),
}) })
// Insert 1000 unique time series and verify the that inserted data has been // Insert 1000 unique time series.
// indeed sharded by checking various metrics exposed by vminsert and
// vmstorage.
// Also wait for 2 seconds to let vminsert and vmstorage servers to update
// the values of the metrics they expose and to let vmstorages flush pending
// items so they become searchable.
const numMetrics = 1000 const numMetrics = 1000
records := make([]string, numMetrics) records := make([]string, numMetrics)
for i := range numMetrics { want := &apptest.PrometheusAPIV1SeriesResponse{
records[i] = fmt.Sprintf("metric_%d %d", i, rand.IntN(1000)) Status: "success",
IsPartial: false,
Data: make([]map[string]string, numMetrics),
} }
vminsert.PrometheusAPIV1ImportPrometheus(t, records, apptest.QueryOpts{Tenant: "0"}) for i := range numMetrics {
time.Sleep(2 * time.Second) name := fmt.Sprintf("metric_%d", i)
records[i] = fmt.Sprintf("%s %d", name, rand.IntN(1000))
want.Data[i] = map[string]string{"__name__": name}
}
want.Sort()
qopts := apptest.QueryOpts{Tenant: "0"}
vminsert.PrometheusAPIV1ImportPrometheus(t, records, qopts)
vmstorage1.ForceFlush(t)
vmstorage2.ForceFlush(t)
// Verify that inserted data has been indeed sharded by checking metrics
// exposed by vmstorage.
numMetrics1 := vmstorage1.GetIntMetric(t, "vm_vminsert_metrics_read_total") numMetrics1 := vmstorage1.GetIntMetric(t, "vm_vminsert_metrics_read_total")
if numMetrics1 == 0 { if numMetrics1 == 0 {
@ -65,14 +72,13 @@ func TestClusterVminsertShardsDataVmselectBuildsFullResultFromShards(t *testing.
// Retrieve all time series and verify that vmselect serves the complete set // Retrieve all time series and verify that vmselect serves the complete set
// of time series. // of time series.
series := vmselect.PrometheusAPIV1Series(t, `{__name__=~".*"}`, apptest.QueryOpts{Tenant: "0"}) tc.Assert(&apptest.AssertOptions{
if got, want := series.Status, "success"; got != want { Msg: "unexpected /api/v1/series response",
t.Fatalf("unexpected /ap1/v1/series response status: got %s, want %s", got, want) Got: func() any {
} res := vmselect.PrometheusAPIV1Series(t, `{__name__=~".*"}`, qopts)
if got, want := series.IsPartial, false; got != want { res.Sort()
t.Fatalf("unexpected /ap1/v1/series response isPartial value: got %t, want %t", got, want) return res
} },
if got, want := len(series.Data), numMetrics; got != want { Want: want,
t.Fatalf("unexpected /ap1/v1/series response series count: got %d, want %d", got, want) })
}
} }

View file

@ -6,6 +6,7 @@ import (
"regexp" "regexp"
"strings" "strings"
"testing" "testing"
"time"
) )
// Vminsert holds the state of a vminsert app and provides vminsert-specific // Vminsert holds the state of a vminsert app and provides vminsert-specific
@ -55,10 +56,47 @@ func (app *Vminsert) PrometheusAPIV1ImportPrometheus(t *testing.T, records []str
t.Helper() t.Helper()
url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, opts.Tenant) url := fmt.Sprintf("http://%s/insert/%s/prometheus/api/v1/import/prometheus", app.httpListenAddr, opts.Tenant)
wantRowsSentCount := app.rpcRowsSentTotal(t) + len(records)
app.cli.Post(t, url, "text/plain", strings.Join(records, "\n"), http.StatusNoContent) app.cli.Post(t, url, "text/plain", strings.Join(records, "\n"), http.StatusNoContent)
app.waitUntilSent(t, wantRowsSentCount)
} }
// String returns the string representation of the vminsert app state. // String returns the string representation of the vminsert app state.
func (app *Vminsert) String() string { func (app *Vminsert) String() string {
return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr) return fmt.Sprintf("{app: %s httpListenAddr: %q}", app.app, app.httpListenAddr)
} }
// waitUntilSent waits until vminsert sends buffered data to vmstorage.
//
// Waiting is implemented a retrieving the value of `vm_rpc_rows_sent_total`
// metric and checking whether it is equal or greater than the wanted value.
// If it is, then the data has been sent to vmstorage.
//
// Unreliable if the records are inserted concurrently.
func (app *Vminsert) waitUntilSent(t *testing.T, wantRowsSentCount int) {
t.Helper()
const (
retries = 20
period = 100 * time.Millisecond
)
for range retries {
if app.rpcRowsSentTotal(t) >= wantRowsSentCount {
return
}
time.Sleep(period)
}
t.Fatalf("timed out while waiting for inserted rows to be sent to vmstorage")
}
// rpcRowsSentTotal retrieves the values of all vminsert
// `vm_rpc_rows_sent_total` metrics (there will be one for each vmstorage) and
// returns their integer sum.
func (app *Vminsert) rpcRowsSentTotal(t *testing.T) int {
total := 0.0
for _, v := range app.GetMetricsByPrefix(t, "vm_rpc_rows_sent_total") {
total += v
}
return int(total)
}