VictoriaMetrics/apptest/tests/multilevel_test.go
Artem Fetishev c5fe5f96d4
apptest: Cluster replication tests (#7693)
### Describe Your Changes

Add cluster replication tests. No group replication yet. Some necessary
enhancements to the apptest framework have been done as well. Also other
existing tests were revisitied to take advantage of new QueryOpts added
by @f41gh7 in #7635.

The tests verify the following scenarios:
1.  Data is written to vmstorages multiple times
2. Vmselect deduplicates replicated data
3. Vmselect does not return partial result if it receives responses from
enough replicas
4. Vmselect does not wait for the rest from all replicas (skips slower
ones)

Something similar will be added for storage groups. These tests should
be used to prove that the fix for #6924 works and at the same time does
not break other aspects of replication.

### Checklist

The following checks are **mandatory**:

- [x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: Artem Fetishev <rtm@victoriametrics.com>
2024-12-05 15:16:51 +01:00

72 lines
2 KiB
Go

package tests
import (
"fmt"
"math/rand/v2"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
)
// See: https://docs.victoriametrics.com/cluster-victoriametrics/#multi-level-cluster-setup
func TestClusterMultilevelSelect(t *testing.T) {
tc := apptest.NewTestCase(t)
defer tc.Stop()
// Set up the following multi-level cluster configuration:
//
// vmselect (L2) -> vmselect (L1) -> vmstorage <- vminsert
//
// vmisert writes data into vmstorage.
// vmselect (L2) reads that data via vmselect (L1).
vmstorage := tc.MustStartVmstorage("vmstorage", []string{
"-storageDataPath=" + tc.Dir() + "/vmstorage",
})
vminsert := tc.MustStartVminsert("vminsert", []string{
"-storageNode=" + vmstorage.VminsertAddr(),
})
vmselectL1 := tc.MustStartVmselect("vmselect-level1", []string{
"-storageNode=" + vmstorage.VmselectAddr(),
})
vmselectL2 := tc.MustStartVmselect("vmselect-level2", []string{
"-storageNode=" + vmselectL1.ClusternativeListenAddr(),
})
// Insert 1000 unique time series.
const numMetrics = 1000
records := make([]string, numMetrics)
want := &apptest.PrometheusAPIV1SeriesResponse{
Status: "success",
IsPartial: false,
Data: make([]map[string]string, numMetrics),
}
for i := range numMetrics {
name := fmt.Sprintf("metric_%d", i)
records[i] = fmt.Sprintf("%s %d", name, rand.IntN(1000))
want.Data[i] = map[string]string{"__name__": name}
}
want.Sort()
qopts := apptest.QueryOpts{Tenant: "0"}
vminsert.PrometheusAPIV1ImportPrometheus(t, records, qopts)
vmstorage.ForceFlush(t)
// Retrieve all time series and verify that both vmselect (L1) and
// vmselect (L2) serve the complete set of time series.
assertSeries := func(app *apptest.Vmselect) {
t.Helper()
tc.Assert(&apptest.AssertOptions{
Msg: "unexpected /api/v1/series response",
Got: func() any {
res := app.PrometheusAPIV1Series(t, `{__name__=~".*"}`, qopts)
res.Sort()
return res
},
Want: want,
})
}
assertSeries(vmselectL1)
assertSeries(vmselectL2)
}