2024-10-30 14:22:06 +00:00
|
|
|
package tests
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"math/rand/v2"
|
|
|
|
"testing"
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/apptest"
|
|
|
|
)
|
|
|
|
|
2024-11-07 11:58:37 +00:00
|
|
|
func TestClusterVminsertShardsDataVmselectBuildsFullResultFromShards(t *testing.T) {
|
2024-10-30 14:22:06 +00:00
|
|
|
tc := apptest.NewTestCase(t)
|
2024-11-08 13:49:00 +00:00
|
|
|
defer tc.Stop()
|
2024-10-30 14:22:06 +00:00
|
|
|
|
|
|
|
// Set up the following cluster configuration:
|
|
|
|
//
|
|
|
|
// - two vmstorage instances
|
|
|
|
// - vminsert points to the two vmstorages, its replication setting
|
|
|
|
// is off which means it will only shard the incoming data across the two
|
|
|
|
// vmstorages.
|
|
|
|
// - vmselect points to the two vmstorages and is expected to query both
|
|
|
|
// vmstorages and build the full result out of the two partial results.
|
|
|
|
|
2024-11-08 13:49:00 +00:00
|
|
|
vmstorage1 := tc.MustStartVmstorage("vmstorage-1", []string{
|
2024-10-30 14:22:06 +00:00
|
|
|
"-storageDataPath=" + tc.Dir() + "/vmstorage-1",
|
2024-11-08 13:49:00 +00:00
|
|
|
})
|
|
|
|
vmstorage2 := tc.MustStartVmstorage("vmstorage-2", []string{
|
2024-10-30 14:22:06 +00:00
|
|
|
"-storageDataPath=" + tc.Dir() + "/vmstorage-2",
|
2024-11-08 13:49:00 +00:00
|
|
|
})
|
|
|
|
vminsert := tc.MustStartVminsert("vminsert", []string{
|
2024-10-30 14:22:06 +00:00
|
|
|
"-storageNode=" + vmstorage1.VminsertAddr() + "," + vmstorage2.VminsertAddr(),
|
2024-11-08 13:49:00 +00:00
|
|
|
})
|
|
|
|
vmselect := tc.MustStartVmselect("vmselect", []string{
|
2024-10-30 14:22:06 +00:00
|
|
|
"-storageNode=" + vmstorage1.VmselectAddr() + "," + vmstorage2.VmselectAddr(),
|
2024-11-08 13:49:00 +00:00
|
|
|
})
|
2024-10-30 14:22:06 +00:00
|
|
|
|
2024-11-20 15:30:55 +00:00
|
|
|
// Insert 1000 unique time series.
|
2024-10-30 14:22:06 +00:00
|
|
|
|
|
|
|
const numMetrics = 1000
|
|
|
|
records := make([]string, numMetrics)
|
2024-11-20 15:30:55 +00:00
|
|
|
want := &apptest.PrometheusAPIV1SeriesResponse{
|
|
|
|
Status: "success",
|
|
|
|
IsPartial: false,
|
|
|
|
Data: make([]map[string]string, numMetrics),
|
|
|
|
}
|
2024-10-30 14:22:06 +00:00
|
|
|
for i := range numMetrics {
|
2024-11-20 15:30:55 +00:00
|
|
|
name := fmt.Sprintf("metric_%d", i)
|
|
|
|
records[i] = fmt.Sprintf("%s %d", name, rand.IntN(1000))
|
|
|
|
want.Data[i] = map[string]string{"__name__": name}
|
2024-10-30 14:22:06 +00:00
|
|
|
}
|
2024-11-20 15:30:55 +00:00
|
|
|
want.Sort()
|
|
|
|
qopts := apptest.QueryOpts{Tenant: "0"}
|
|
|
|
vminsert.PrometheusAPIV1ImportPrometheus(t, records, qopts)
|
|
|
|
vmstorage1.ForceFlush(t)
|
|
|
|
vmstorage2.ForceFlush(t)
|
|
|
|
|
|
|
|
// Verify that inserted data has been indeed sharded by checking metrics
|
|
|
|
// exposed by vmstorage.
|
2024-10-30 14:22:06 +00:00
|
|
|
|
|
|
|
numMetrics1 := vmstorage1.GetIntMetric(t, "vm_vminsert_metrics_read_total")
|
|
|
|
if numMetrics1 == 0 {
|
|
|
|
t.Fatalf("storage-1 has no time series")
|
|
|
|
}
|
|
|
|
numMetrics2 := vmstorage2.GetIntMetric(t, "vm_vminsert_metrics_read_total")
|
|
|
|
if numMetrics2 == 0 {
|
|
|
|
t.Fatalf("storage-2 has no time series")
|
|
|
|
}
|
|
|
|
if numMetrics1+numMetrics2 != numMetrics {
|
|
|
|
t.Fatalf("unxepected total number of metrics: vmstorage-1 (%d) + vmstorage-2 (%d) != %d", numMetrics1, numMetrics2, numMetrics)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieve all time series and verify that vmselect serves the complete set
|
2024-11-20 15:30:55 +00:00
|
|
|
// of time series.
|
2024-10-30 14:22:06 +00:00
|
|
|
|
2024-11-20 15:30:55 +00:00
|
|
|
tc.Assert(&apptest.AssertOptions{
|
|
|
|
Msg: "unexpected /api/v1/series response",
|
|
|
|
Got: func() any {
|
|
|
|
res := vmselect.PrometheusAPIV1Series(t, `{__name__=~".*"}`, qopts)
|
|
|
|
res.Sort()
|
|
|
|
return res
|
|
|
|
},
|
|
|
|
Want: want,
|
|
|
|
})
|
2024-10-30 14:22:06 +00:00
|
|
|
}
|