VictoriaMetrics/app/vmctl/vm_native_test.go
Dmytro Kozlov 4d68f5b1fc
app/vmctl: integration test for native protocol (#3947)
* app/vmctl: integration test for native protocol

* app/vmctl: implemented two integration tests

* app/vmctl: cleanup

* app/vmctl: split storage init and filling data logic

* app/vmctl: cleanup

* app/vmctl: remove storage from server, used initialization process

* app/vmctl: prepare for parallel run, code cleanup

* app/vmctl: code cleanup

* app/vmctl: remove unused field
2023-03-14 09:55:49 +01:00

296 lines
8 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/backoff"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/native"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/stepper"
remote_read_integration "github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/testdata/servers_integration_test"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
const (
storagePath = "TestStorage"
retentionPeriod = "100y"
)
func Test_vmNativeProcessor_run(t *testing.T) {
processFlags()
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
defer func() {
vmstorage.Stop()
if err := os.RemoveAll(storagePath); err != nil {
log.Fatalf("cannot remove %q: %s", storagePath, err)
}
}()
type fields struct {
filter native.Filter
dst *native.Client
src *native.Client
backoff *backoff.Backoff
s *stats
rateLimit int64
interCluster bool
cc int
matchName string
matchValue string
}
type args struct {
ctx context.Context
silent bool
}
tests := []struct {
name string
fields fields
args args
vmSeries func(start, end, numOfSeries, numOfSamples int64) []vm.TimeSeries
expectedSeries []vm.TimeSeries
start string
end string
numOfSamples int64
numOfSeries int64
chunk string
wantErr bool
}{
{
name: "step minute on minute time range",
start: "2022-11-25T11:23:05+02:00",
end: "2022-11-27T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMinute,
fields: fields{
filter: native.Filter{},
backoff: backoff.New(),
rateLimit: 0,
interCluster: false,
cc: 1,
matchName: "__name__",
matchValue: ".*",
},
args: args{
ctx: context.Background(),
silent: true,
},
vmSeries: remote_read_integration.GenerateVNSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1669368185000, 1669454615000},
Values: []float64{0, 0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1669368185000, 1669454615000},
Values: []float64{100, 100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1669368185000, 1669454615000},
Values: []float64{200, 200},
},
},
wantErr: false,
},
{
name: "step month on month time range",
start: "2022-09-26T11:23:05+02:00",
end: "2022-11-26T11:24:05+02:00",
numOfSamples: 2,
numOfSeries: 3,
chunk: stepper.StepMonth,
fields: fields{
filter: native.Filter{},
backoff: backoff.New(),
rateLimit: 0,
interCluster: false,
cc: 1,
matchName: "__name__",
matchValue: ".*",
},
args: args{
ctx: context.Background(),
silent: true,
},
vmSeries: remote_read_integration.GenerateVNSeries,
expectedSeries: []vm.TimeSeries{
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1664184185000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "0"}},
Timestamps: []int64{1666819415000},
Values: []float64{0},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1664184185000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "1"}},
Timestamps: []int64{1666819415000},
Values: []float64{100},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1664184185000},
Values: []float64{200},
},
{
Name: "vm_metric_1",
LabelPairs: []vm.LabelPair{{Name: "job", Value: "2"}},
Timestamps: []int64{1666819415000},
Values: []float64{200},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
src := remote_read_integration.NewRemoteWriteServer(t)
dst := remote_read_integration.NewRemoteWriteServer(t)
defer func() {
src.Close()
dst.Close()
}()
start, err := time.Parse(time.RFC3339, tt.start)
if err != nil {
t.Fatalf("Error parse start time: %s", err)
}
end, err := time.Parse(time.RFC3339, tt.end)
if err != nil {
t.Fatalf("Error parse end time: %s", err)
}
tt.fields.filter.Match = fmt.Sprintf("%s=%q", tt.fields.matchName, tt.fields.matchValue)
tt.fields.filter.TimeStart = tt.start
tt.fields.filter.TimeEnd = tt.end
rws := tt.vmSeries(start.Unix(), end.Unix(), tt.numOfSeries, tt.numOfSamples)
src.Series(rws)
dst.ExpectedSeries(tt.expectedSeries)
if err := fillStorage(rws); err != nil {
t.Fatalf("error add series to storage: %s", err)
}
tt.fields.src = &native.Client{
AuthCfg: nil,
Addr: src.URL(),
ExtraLabels: []string{},
DisableHTTPKeepAlive: false,
}
tt.fields.dst = &native.Client{
AuthCfg: nil,
Addr: dst.URL(),
ExtraLabels: []string{},
DisableHTTPKeepAlive: false,
}
p := &vmNativeProcessor{
filter: tt.fields.filter,
dst: tt.fields.dst,
src: tt.fields.src,
backoff: tt.fields.backoff,
s: tt.fields.s,
rateLimit: tt.fields.rateLimit,
interCluster: tt.fields.interCluster,
cc: tt.fields.cc,
}
if err := p.run(tt.args.ctx, tt.args.silent); (err != nil) != tt.wantErr {
t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr)
}
deleted, err := deleteSeries(tt.fields.matchName, tt.fields.matchValue)
if err != nil {
t.Fatalf("error delete series: %s", err)
}
if int64(deleted) != tt.numOfSeries {
t.Fatalf("expected deleted series %d; got deleted series %d", tt.numOfSeries, deleted)
}
})
}
}
func processFlags() {
flag.Parse()
for _, fv := range []struct {
flag string
value string
}{
{flag: "storageDataPath", value: storagePath},
{flag: "retentionPeriod", value: retentionPeriod},
} {
// panics if flag doesn't exist
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
log.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
}
}
}
func fillStorage(series []vm.TimeSeries) error {
var mrs []storage.MetricRow
for _, series := range series {
var labels []prompb.Label
for _, lp := range series.LabelPairs {
labels = append(labels, prompb.Label{Name: []byte(lp.Name), Value: []byte(lp.Value)})
}
if series.Name != "" {
labels = append(labels, prompb.Label{Name: []byte("__name__"), Value: []byte(series.Name)})
}
mr := storage.MetricRow{}
mr.MetricNameRaw = storage.MarshalMetricNameRaw(mr.MetricNameRaw[:0], labels)
timestamps := series.Timestamps
values := series.Values
for i, value := range values {
mr.Timestamp = timestamps[i]
mr.Value = value
mrs = append(mrs, mr)
}
}
if err := vmstorage.AddRows(mrs); err != nil {
return fmt.Errorf("unexpected error in AddRows: %s", err)
}
vmstorage.Storage.DebugFlush()
return nil
}
func deleteSeries(name, value string) (int, error) {
tfs := storage.NewTagFilters()
if err := tfs.Add([]byte(name), []byte(value), false, true); err != nil {
return 0, fmt.Errorf("unexpected error in TagFilters.Add: %w", err)
}
return vmstorage.DeleteSeries(nil, []*storage.TagFilters{tfs})
}